From d8349c9025d7a4693e3525be2cd7a18e7659cc6a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Mon, 11 Mar 2024 17:24:29 +0800 Subject: [PATCH 1/9] save work --- Cargo.lock | 102 ++++++++++++++- src/connector/src/sink/mod.rs | 4 + src/connector/src/sink/opendal/gcs.rs | 23 ++++ src/connector/src/sink/opendal/mod.rs | 46 +++++++ src/connector/src/sink/opendal/s3.rs | 174 ++++++++++++++++++++++++++ 5 files changed, 346 insertions(+), 3 deletions(-) create mode 100644 src/connector/src/sink/opendal/gcs.rs create mode 100644 src/connector/src/sink/opendal/mod.rs create mode 100644 src/connector/src/sink/opendal/s3.rs diff --git a/Cargo.lock b/Cargo.lock index bf8d3c00df2d..928de9d3f645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.7.7" @@ -1647,6 +1658,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.3.1" @@ -1932,6 +1952,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.83" @@ -2046,6 +2075,16 @@ dependencies = [ "half 1.8.2", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -5232,6 +5271,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "inquire" version = "0.7.0" @@ -7265,6 +7314,16 @@ dependencies = [ "prost-types 0.11.9", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -7437,6 +7496,21 @@ dependencies = [ "spki 0.7.2", ] +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der 0.7.8", + "pbkdf2", + "scrypt", + "sha2", + "spki 0.7.2", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -7454,6 +7528,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ "der 0.7.8", + "pkcs5", + "rand_core", "spki 0.7.2", ] @@ -8469,9 +8545,9 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.14.6" +version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd" +checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" dependencies = [ "anyhow", "async-trait", @@ -8496,7 +8572,6 @@ dependencies = [ "serde_json", "sha1", "sha2", - "tokio", ] [[package]] @@ -10313,6 +10388,7 @@ dependencies = [ "pkcs1", "pkcs8 0.10.2", "rand_core", + "sha2", "signature 2.0.0", "spki 0.7.2", "subtle", @@ -10545,6 +10621,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.6" @@ -10611,6 +10696,17 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.0" diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7daf0883ac7e..858492b83b82 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -29,6 +29,7 @@ pub mod kinesis; pub mod log_store; pub mod mock_coordination_client; pub mod nats; +pub mod opendal; pub mod pulsar; pub mod redis; pub mod remote; @@ -88,6 +89,7 @@ macro_rules! for_all_sinks { { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, + { S3, $crate::sink::opendal::s3::S3Sink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, @@ -525,6 +527,8 @@ pub enum SinkError { ), #[error("Starrocks error: {0}")] Starrocks(String), + #[error("S3 error: {0}")] + S3(String), #[error("Pulsar error: {0}")] Pulsar( #[source] diff --git a/src/connector/src/sink/opendal/gcs.rs b/src/connector/src/sink/opendal/gcs.rs new file mode 100644 index 000000000000..09c0891ee843 --- /dev/null +++ b/src/connector/src/sink/opendal/gcs.rs @@ -0,0 +1,23 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// impl OpendalSinkWriter { +// /// create opendal gcs sink. +// pub fn new_gcs_sink( +// gcs_properties: gcsPropertiesCommon, +// assume_role: Option, +// ) -> Result { +// todo!(); +// } +// } diff --git a/src/connector/src/sink/opendal/mod.rs b/src/connector/src/sink/opendal/mod.rs new file mode 100644 index 000000000000..50b3f05fc18f --- /dev/null +++ b/src/connector/src/sink/opendal/mod.rs @@ -0,0 +1,46 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod s3; +// pub mod gcs; + +pub const GCS_SINK: &str = "gcs"; + +// pub trait OpendalSink: Send + Sync + 'static + Clone + PartialEq { +// type Properties: SourceProperties + Send + Sync; + +// fn new_sink_engine(properties: Self::Properties) -> ConnectorResult>; +// } + +// #[derive(Debug, Clone, Copy, PartialEq, Eq)] +// pub struct OpendalS3; + +// impl OpendalSink for OpendalS3 { +// type Properties = OpendalS3Properties; + +// fn new_sink_engine(properties: Self::Properties) -> ConnectorResult> { +// todo!() +// } +// } + +// #[derive(Debug, Clone, Copy, PartialEq, Eq)] +// pub struct OpendalGcs; + +// impl OpendalSink for OpendalGcs { +// type Properties = GcsProperties; + +// fn new_sink_engine(properties: Self::Properties) -> ConnectorResult> { +// OpendalEnumerator::new_gcs_source(properties) +// } +// } diff --git a/src/connector/src/sink/opendal/s3.rs b/src/connector/src/sink/opendal/s3.rs new file mode 100644 index 000000000000..4f1492dbbefa --- /dev/null +++ b/src/connector/src/sink/opendal/s3.rs @@ -0,0 +1,174 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::HashMap; +use std::sync::Arc; +use opendal::{Metakey, Operator}; +use anyhow::anyhow; +use crate::sink::encoder::{JsonEncoder, RowEncoder}; +use crate::sink::writer::LogSinkerOf; +use crate::sink::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use async_trait::async_trait; +use bytes::Bytes; +use itertools::Itertools; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde::Deserialize; +use serde_derive::Serialize; +use serde_json::Value; +use serde_with::serde_as; +use thiserror_ext::AsReport; +use with_options::WithOptions; + +use crate::sink::writer::SinkWriterExt; +use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; + +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct S3Common { + #[serde(rename = "s3.region_name")] + pub region_name: String, + #[serde(rename = "s3.bucket_name")] + pub bucket_name: String, + #[serde(rename = "match_pattern", default)] + pub match_pattern: Option, + #[serde(rename = "s3.credentials.access", default)] + pub access: Option, + #[serde(rename = "s3.credentials.secret", default)] + pub secret: Option, + #[serde(rename = "s3.endpoint_url")] + pub endpoint_url: Option, + #[serde(rename = "s3.assume_role", default)] + pub assume_role: Option, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct S3Config { + #[serde(flatten)] + pub common: S3Common, + + pub r#type: String, // accept "append-only" or "upsert" +} + +pub const S3_SINK: &str = "s3"; + +impl S3Config { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct S3Sink { + pub config: S3Config, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl S3Sink { + pub fn new( + config: S3Config, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + + +impl Sink for S3Sink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + + const SINK_NAME: &'static str = S3_SINK; + + async fn validate(&self) -> Result<()> { + todo!() + } + + async fn new_log_sinker( + &self, + writer_param: crate::sink::SinkWriterParam, + ) -> Result { + todo!() + } +} + +impl TryFrom for S3Sink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = S3Config::from_hashmap(param.properties)?; + S3Sink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +} + + +pub struct S3SinkWriter { + pub config: S3Config, + schema: Schema, + op: Operator, + pk_indices: Vec, + is_append_only: bool, + row_encoder: JsonEncoder, +} + +#[async_trait] +impl SinkWriter for S3SinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + todo!() + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + todo!() + } + + async fn abort(&mut self) -> Result<()> { + todo!() + } + + async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + todo!() + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + todo!() + } +} From dea340aa9aef61548f2ba7add0e3937a212d79ab Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Mar 2024 14:35:44 +0800 Subject: [PATCH 2/9] save work, add gcs --- src/connector/src/sink/encoder/json.rs | 32 ++++- src/connector/src/sink/encoder/mod.rs | 1 + src/connector/src/sink/mod.rs | 3 + src/connector/src/sink/opendal/gcs.rs | 170 +++++++++++++++++++++++-- src/connector/src/sink/opendal/mod.rs | 113 +++++++++++++--- src/connector/src/sink/opendal/s3.rs | 88 +++++++------ 6 files changed, 338 insertions(+), 69 deletions(-) diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 64a06ff70770..f3cae9f75fc0 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -114,6 +114,23 @@ impl JsonEncoder { } } + pub fn new_with_s3( + schema: Schema, + col_indices: Option>, + map: HashMap, + ) -> Self { + Self { + schema, + col_indices, + time_handling_mode: TimeHandlingMode::Milli, + date_handling_mode: DateHandlingMode::String, + timestamp_handling_mode: TimestampHandlingMode::String, + timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, + custom_json_type: CustomJsonType::S3, + kafka_connect: None, + } + } + pub fn new_with_bigquery(schema: Schema, col_indices: Option>) -> Self { Self { schema, @@ -259,7 +276,10 @@ fn datum_to_json_object( } json!(v_string) } - CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => { + CustomJsonType::Es + | CustomJsonType::None + | CustomJsonType::BigQuery + | CustomJsonType::S3 => { json!(v.to_text()) } }, @@ -311,7 +331,10 @@ fn datum_to_json_object( } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(), - CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => { + CustomJsonType::Doris(_) + | CustomJsonType::None + | CustomJsonType::BigQuery + | CustomJsonType::S3 => { json!(jsonb_ref.to_string()) } }, @@ -362,7 +385,10 @@ fn datum_to_json_object( "starrocks can't support struct".to_string(), )); } - CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => { + CustomJsonType::Es + | CustomJsonType::None + | CustomJsonType::BigQuery + | CustomJsonType::S3 => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( st.iter() diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 3254447e2707..97b2cb1f8eb9 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -144,6 +144,7 @@ pub enum CustomJsonType { Es, // starrocks' need jsonb is struct StarRocks(HashMap), + S3, // bigquery need null array -> [] BigQuery, None, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 858492b83b82..1d6f5c9b0e66 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -90,6 +90,7 @@ macro_rules! for_all_sinks { { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, { S3, $crate::sink::opendal::s3::S3Sink }, + { GCS, $crate::sink::opendal::gcs::GcsSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, @@ -529,6 +530,8 @@ pub enum SinkError { Starrocks(String), #[error("S3 error: {0}")] S3(String), + #[error("Gcs error: {0}")] + GCS(String), #[error("Pulsar error: {0}")] Pulsar( #[source] diff --git a/src/connector/src/sink/opendal/gcs.rs b/src/connector/src/sink/opendal/gcs.rs index 09c0891ee843..3c7bdfc56ba6 100644 --- a/src/connector/src/sink/opendal/gcs.rs +++ b/src/connector/src/sink/opendal/gcs.rs @@ -11,13 +11,165 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::sync::Arc; -// impl OpendalSinkWriter { -// /// create opendal gcs sink. -// pub fn new_gcs_sink( -// gcs_properties: gcsPropertiesCommon, -// assume_role: Option, -// ) -> Result { -// todo!(); -// } -// } +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use itertools::Itertools; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Gcs; +use crate::sink::opendal::OpenDalSinkWriter; +use opendal::{Metakey, Operator}; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde::Deserialize; +use serde_derive::Serialize; +use serde_json::Value; +use serde_with::serde_as; +use thiserror_ext::AsReport; +use with_options::WithOptions; + +use crate::error::ConnectorError; +use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; + +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct GcsCommon { + #[serde(rename = "gcs.bucket_name")] + pub bucket_name: String, + + /// The base64 encoded credential key. If not set, ADC will be used. + #[serde(rename = "gcs.credential")] + pub credential: Option, + + /// If credential/ADC is not set. The service account can be used to provide the credential info. + #[serde(rename = "gcs.service_account", default)] + pub service_account: Option, + + #[serde(rename = "match_pattern", default)] + pub match_pattern: Option, + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct GcsConfig { + #[serde(flatten)] + pub common: GcsCommon, + + pub r#type: String, // accept "append-only" or "upsert" +} + +pub const GCS_SINK: &str = "gcs"; + +impl GcsConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct GcsSink { + pub config: GcsConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl GcsSink { + pub fn new( + config: GcsConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + +impl Sink for GcsSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = GCS_SINK; + + async fn validate(&self) -> Result<()> { + todo!() + } + + async fn new_log_sinker( + &self, + writer_param: crate::sink::SinkWriterParam, + ) -> Result { + todo!() + } +} + +impl TryFrom for GcsSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = GcsConfig::from_hashmap(param.properties)?; + GcsSink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +} + +impl OpenDalSinkWriter { + pub async fn new_gcs_sink(config: GcsConfig) -> Result { + // Create gcs builder. + let mut builder = Gcs::default(); + + builder.bucket(&config.common.bucket_name); + + // if credential env is set, use it. Otherwise, ADC will be used. + if let Some(cred) = config.common.credential { + builder.credential(&cred); + } else { + let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); + if let Ok(cred) = cred { + builder.credential(&cred); + } + } + + if let Some(service_account) = config.common.service_account { + builder.service_account(&service_account); + } + let operator: Operator = Operator::new(builder) + .map_err(|e| SinkError::Connector(e.into()))? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) + } +} diff --git a/src/connector/src/sink/opendal/mod.rs b/src/connector/src/sink/opendal/mod.rs index 50b3f05fc18f..495c1673c040 100644 --- a/src/connector/src/sink/opendal/mod.rs +++ b/src/connector/src/sink/opendal/mod.rs @@ -12,35 +12,106 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod gcs; pub mod s3; -// pub mod gcs; + +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use async_trait::async_trait; +use bytes::Bytes; +use itertools::Itertools; +use opendal::{Metakey, Operator}; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; +use serde::Deserialize; +use serde_derive::Serialize; +use serde_json::Value; +use serde_with::serde_as; +use thiserror_ext::AsReport; +use with_options::WithOptions; + +use crate::error::ConnectorError; +use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; pub const GCS_SINK: &str = "gcs"; -// pub trait OpendalSink: Send + Sync + 'static + Clone + PartialEq { -// type Properties: SourceProperties + Send + Sync; -// fn new_sink_engine(properties: Self::Properties) -> ConnectorResult>; -// } +pub struct OpenDalSinkWriter { + schema: Schema, + op: Operator, + pk_indices: Vec, + is_append_only: bool, + row_encoder: JsonEncoder, + path: String, +} + +#[async_trait] +impl SinkWriter for OpenDalSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + let path = &self.path.clone(); + if self.is_append_only { + self.append_only(chunk, path).await + } else { + unimplemented!() + } + } -// #[derive(Debug, Clone, Copy, PartialEq, Eq)] -// pub struct OpendalS3; + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } -// impl OpendalSink for OpendalS3 { -// type Properties = OpendalS3Properties; + async fn abort(&mut self) -> Result<()> { + Ok(()) + } -// fn new_sink_engine(properties: Self::Properties) -> ConnectorResult> { -// todo!() -// } -// } + async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { + todo!() + } -// #[derive(Debug, Clone, Copy, PartialEq, Eq)] -// pub struct OpendalGcs; + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} -// impl OpendalSink for OpendalGcs { -// type Properties = GcsProperties; +impl OpenDalSinkWriter { + pub async fn new( + op: Operator, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + path: &str, + ) -> Result { + let mut decimal_map = HashMap::default(); + Ok(Self { + schema: schema.clone(), + pk_indices, + op, + is_append_only, + row_encoder: JsonEncoder::new_with_s3(schema, None, decimal_map), + path: path.to_string(), + }) + } -// fn new_sink_engine(properties: Self::Properties) -> ConnectorResult> { -// OpendalEnumerator::new_gcs_source(properties) -// } -// } + async fn append_only(&mut self, chunk: StreamChunk, path: &str) -> Result<()> { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); + self.op + .write(path, row_json_string) + .await + .map_err(|e| SinkError::Connector(e.into()))?; + } + Ok(()) + } +} diff --git a/src/connector/src/sink/opendal/s3.rs b/src/connector/src/sink/opendal/s3.rs index 4f1492dbbefa..5dc9f546e12c 100644 --- a/src/connector/src/sink/opendal/s3.rs +++ b/src/connector/src/sink/opendal/s3.rs @@ -13,14 +13,16 @@ // limitations under the License. use std::collections::HashMap; use std::sync::Arc; -use opendal::{Metakey, Operator}; + use anyhow::anyhow; -use crate::sink::encoder::{JsonEncoder, RowEncoder}; -use crate::sink::writer::LogSinkerOf; -use crate::sink::{SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; use async_trait::async_trait; use bytes::Bytes; +use deltalake::storage::s3; +use crate::sink::opendal::OpenDalSinkWriter; use itertools::Itertools; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; +use opendal::{Metakey, Operator}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -32,8 +34,13 @@ use serde_with::serde_as; use thiserror_ext::AsReport; use with_options::WithOptions; -use crate::sink::writer::SinkWriterExt; -use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; +use crate::error::ConnectorError; +use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, + SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct S3Common { @@ -66,9 +73,8 @@ pub const S3_SINK: &str = "s3"; impl S3Config { pub fn from_hashmap(properties: HashMap) -> Result { - let config = - serde_json::from_value::(serde_json::to_value(properties).unwrap()) - .map_err(|e| SinkError::Config(anyhow!(e)))?; + let config = serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { return Err(SinkError::Config(anyhow!( "`{}` must be {}, or {}", @@ -105,12 +111,10 @@ impl S3Sink { } } - impl Sink for S3Sink { type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; + type LogSinker = LogSinkerOf; - const SINK_NAME: &'static str = S3_SINK; async fn validate(&self) -> Result<()> { @@ -140,35 +144,47 @@ impl TryFrom for S3Sink { } } +impl OpenDalSinkWriter { + pub async fn new_s3_sink(config: S3Config) -> Result { + // Create s3 builder. + let mut builder = S3::default(); + builder.bucket(&config.common.bucket_name); + builder.region(&config.common.region_name); -pub struct S3SinkWriter { - pub config: S3Config, - schema: Schema, - op: Operator, - pk_indices: Vec, - is_append_only: bool, - row_encoder: JsonEncoder, -} + if let Some(endpoint_url) = config.common.endpoint_url { + builder.endpoint(&endpoint_url); + } -#[async_trait] -impl SinkWriter for S3SinkWriter { - async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - todo!() - } + if let Some(access) = config.common.access { + builder.access_key_id(&access); + } else { + tracing::error!( + "access key id of aws s3 is not set, bucket {}", + config.common.bucket_name + ); + } - async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { - todo!() - } + if let Some(secret) = config.common.secret { + builder.secret_access_key(&secret); + } else { + tracing::error!( + "secret access key of aws s3 is not set, bucket {}", + config.common.bucket_name + ); + } - async fn abort(&mut self) -> Result<()> { - todo!() - } + builder.enable_virtual_host_style(); - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - todo!() - } + if let Some(assume_role) = config.common.assume_role { + builder.role_arn(&assume_role); + } - async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { - todo!() + let operator: Operator = Operator::new(builder) + .map_err(|e| SinkError::Connector(e.into()))? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + Ok(operator) } } From 22fe512724fa018379d1f9915cc491e830e3573b Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Mar 2024 15:51:45 +0800 Subject: [PATCH 3/9] implement sink writer --- src/connector/Cargo.toml | 2 +- src/connector/src/sink/encoder/json.rs | 2 +- src/connector/src/sink/mod.rs | 19 +-- .../src/sink/{opendal => opendal_sink}/gcs.rs | 101 ++++++++-------- .../src/sink/{opendal => opendal_sink}/mod.rs | 59 ++++----- .../src/sink/{opendal => opendal_sink}/s3.rs | 114 +++++++++--------- 6 files changed, 149 insertions(+), 148 deletions(-) rename src/connector/src/sink/{opendal => opendal_sink}/gcs.rs (82%) rename src/connector/src/sink/{opendal => opendal_sink}/mod.rs (62%) rename src/connector/src/sink/{opendal => opendal_sink}/s3.rs (83%) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 77f2c1374dc9..65ead4ce3230 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -82,7 +82,7 @@ mysql_common = { version = "0.31", default-features = false, features = [ ] } nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" -opendal = "0.44" +opendal = "0.44.2" parking_lot = "0.12" paste = "1" prometheus = { version = "0.13", features = ["process"] } diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index f3cae9f75fc0..13fce6f5cfe7 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -117,7 +117,7 @@ impl JsonEncoder { pub fn new_with_s3( schema: Schema, col_indices: Option>, - map: HashMap, + _map: HashMap, ) -> Self { Self { schema, diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 1d6f5c9b0e66..0137bf5dc7b8 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -29,7 +29,7 @@ pub mod kinesis; pub mod log_store; pub mod mock_coordination_client; pub mod nats; -pub mod opendal; +pub mod opendal_sink; pub mod pulsar; pub mod redis; pub mod remote; @@ -47,6 +47,7 @@ use ::deltalake::DeltaTableError; use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; +use opendal::Error as OpendalError; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::metrics::{ @@ -89,8 +90,8 @@ macro_rules! for_all_sinks { { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, - { S3, $crate::sink::opendal::s3::S3Sink }, - { GCS, $crate::sink::opendal::gcs::GcsSink }, + { S3, $crate::sink::opendal_sink::s3::S3Sink }, + { GCS, $crate::sink::opendal_sink::gcs::GcsSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, @@ -528,10 +529,8 @@ pub enum SinkError { ), #[error("Starrocks error: {0}")] Starrocks(String), - #[error("S3 error: {0}")] - S3(String), - #[error("Gcs error: {0}")] - GCS(String), + #[error("Opendal error: {0}")] + Opendal(String), #[error("Pulsar error: {0}")] Pulsar( #[source] @@ -564,6 +563,12 @@ impl From for SinkError { } } +impl From for SinkError { + fn from(error: OpendalError) -> Self { + SinkError::Opendal(error.to_string()) + } +} + impl From for SinkError { fn from(value: RpcError) -> Self { SinkError::Remote(anyhow!(value)) diff --git a/src/connector/src/sink/opendal/gcs.rs b/src/connector/src/sink/opendal_sink/gcs.rs similarity index 82% rename from src/connector/src/sink/opendal/gcs.rs rename to src/connector/src/sink/opendal_sink/gcs.rs index 3c7bdfc56ba6..03b89d1fcfdf 100644 --- a/src/connector/src/sink/opendal/gcs.rs +++ b/src/connector/src/sink/opendal_sink/gcs.rs @@ -12,35 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use anyhow::anyhow; -use async_trait::async_trait; -use bytes::Bytes; -use itertools::Itertools; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::Gcs; -use crate::sink::opendal::OpenDalSinkWriter; -use opendal::{Metakey, Operator}; -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::buffer::Bitmap; +use opendal::Operator; use risingwave_common::catalog::Schema; -use risingwave_common::types::DataType; use serde::Deserialize; -use serde_derive::Serialize; -use serde_json::Value; use serde_with::serde_as; -use thiserror_ext::AsReport; use with_options::WithOptions; -use crate::error::ConnectorError; -use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::encoder::RowEncoder; +use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, - SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; +const GCS_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct GcsCommon { #[serde(rename = "gcs.bucket_name")] @@ -54,8 +44,8 @@ pub struct GcsCommon { #[serde(rename = "gcs.service_account", default)] pub service_account: Option, - #[serde(rename = "match_pattern", default)] - pub match_pattern: Option, + #[serde(rename = "gcs.path", default)] + pub path: String, #[serde(flatten)] pub unknown_fields: HashMap, @@ -112,6 +102,34 @@ impl GcsSink { } } +impl GcsSink { + pub async fn new_gcs_sink(config: GcsConfig) -> Result { + // Create gcs builder. + let mut builder = Gcs::default(); + + builder.bucket(&config.common.bucket_name); + + // if credential env is set, use it. Otherwise, ADC will be used. + if let Some(cred) = config.common.credential { + builder.credential(&cred); + } else { + let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); + if let Ok(cred) = cred { + builder.credential(&cred); + } + } + + if let Some(service_account) = config.common.service_account { + builder.service_account(&service_account); + } + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) + } +} + impl Sink for GcsSink { type Coordinator = DummySinkCommitCoordinator; type LogSinker = LogSinkerOf; @@ -119,14 +137,28 @@ impl Sink for GcsSink { const SINK_NAME: &'static str = GCS_SINK; async fn validate(&self) -> Result<()> { - todo!() + let op = Self::new_gcs_sink(self.config.clone()).await?; + Ok(()) } async fn new_log_sinker( &self, writer_param: crate::sink::SinkWriterParam, ) -> Result { - todo!() + let op = Self::new_gcs_sink(self.config.clone()).await?; + let path = self.config.common.path.as_ref(); + let writer = op + .writer_with(&path) + .concurrent(8) + .buffer(GCS_WRITE_BUFFER_SIZE) + .await?; + Ok(OpenDalSinkWriter::new( + writer, + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + )? + .into_log_sinker(writer_param.sink_metrics)) } } @@ -144,32 +176,3 @@ impl TryFrom for GcsSink { ) } } - -impl OpenDalSinkWriter { - pub async fn new_gcs_sink(config: GcsConfig) -> Result { - // Create gcs builder. - let mut builder = Gcs::default(); - - builder.bucket(&config.common.bucket_name); - - // if credential env is set, use it. Otherwise, ADC will be used. - if let Some(cred) = config.common.credential { - builder.credential(&cred); - } else { - let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); - if let Ok(cred) = cred { - builder.credential(&cred); - } - } - - if let Some(service_account) = config.common.service_account { - builder.service_account(&service_account); - } - let operator: Operator = Operator::new(builder) - .map_err(|e| SinkError::Connector(e.into()))? - .layer(LoggingLayer::default()) - .layer(RetryLayer::default()) - .finish(); - Ok(operator) - } -} diff --git a/src/connector/src/sink/opendal/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs similarity index 62% rename from src/connector/src/sink/opendal/mod.rs rename to src/connector/src/sink/opendal_sink/mod.rs index 495c1673c040..2c20d1fe53da 100644 --- a/src/connector/src/sink/opendal/mod.rs +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -18,48 +18,31 @@ pub mod s3; use std::collections::HashMap; use std::sync::Arc; -use anyhow::anyhow; use async_trait::async_trait; -use bytes::Bytes; -use itertools::Itertools; -use opendal::{Metakey, Operator}; +use opendal::{Operator, Writer}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; -use risingwave_common::types::DataType; -use serde::Deserialize; -use serde_derive::Serialize; use serde_json::Value; -use serde_with::serde_as; -use thiserror_ext::AsReport; -use with_options::WithOptions; -use crate::error::ConnectorError; -use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; -use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; -use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, - SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, -}; +use crate::sink::encoder::{JsonEncoder, RowEncoder}; +use crate::sink::{Result, SinkError, SinkWriter}; pub const GCS_SINK: &str = "gcs"; - pub struct OpenDalSinkWriter { schema: Schema, - op: Operator, + writer: Writer, pk_indices: Vec, is_append_only: bool, row_encoder: JsonEncoder, - path: String, } #[async_trait] impl SinkWriter for OpenDalSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - let path = &self.path.clone(); if self.is_append_only { - self.append_only(chunk, path).await + self.append_only(chunk).await } else { unimplemented!() } @@ -70,11 +53,22 @@ impl SinkWriter for OpenDalSinkWriter { } async fn abort(&mut self) -> Result<()> { + self.writer.abort().await?; Ok(()) } - async fn barrier(&mut self, _is_checkpoint: bool) -> Result<()> { - todo!() + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint { + match self.writer.close().await { + Ok(_) => (), + Err(err) => { + self.writer.abort().await?; + return Err(err.into()); + } + }; + } + + Ok(()) } async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { @@ -83,34 +77,29 @@ impl SinkWriter for OpenDalSinkWriter { } impl OpenDalSinkWriter { - pub async fn new( - op: Operator, + pub fn new( + writer: Writer, schema: Schema, pk_indices: Vec, is_append_only: bool, - path: &str, ) -> Result { - let mut decimal_map = HashMap::default(); + let decimal_map = HashMap::default(); Ok(Self { schema: schema.clone(), pk_indices, - op, + writer, is_append_only, row_encoder: JsonEncoder::new_with_s3(schema, None, decimal_map), - path: path.to_string(), }) } - async fn append_only(&mut self, chunk: StreamChunk, path: &str) -> Result<()> { + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { for (op, row) in chunk.rows() { if op != Op::Insert { continue; } let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.op - .write(path, row_json_string) - .await - .map_err(|e| SinkError::Connector(e.into()))?; + self.writer.write(row_json_string).await?; } Ok(()) } diff --git a/src/connector/src/sink/opendal/s3.rs b/src/connector/src/sink/opendal_sink/s3.rs similarity index 83% rename from src/connector/src/sink/opendal/s3.rs rename to src/connector/src/sink/opendal_sink/s3.rs index 5dc9f546e12c..b69cb484b6e0 100644 --- a/src/connector/src/sink/opendal/s3.rs +++ b/src/connector/src/sink/opendal_sink/s3.rs @@ -12,44 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use anyhow::anyhow; -use async_trait::async_trait; -use bytes::Bytes; -use deltalake::storage::s3; -use crate::sink::opendal::OpenDalSinkWriter; -use itertools::Itertools; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::S3; -use opendal::{Metakey, Operator}; -use risingwave_common::array::{Op, StreamChunk}; -use risingwave_common::buffer::Bitmap; +use opendal::Operator; use risingwave_common::catalog::Schema; -use risingwave_common::types::DataType; use serde::Deserialize; -use serde_derive::Serialize; -use serde_json::Value; use serde_with::serde_as; -use thiserror_ext::AsReport; use with_options::WithOptions; -use crate::error::ConnectorError; -use crate::sink::encoder::{JsonEncoder, RowEncoder, TimestampHandlingMode}; +use crate::sink::encoder::RowEncoder; +use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ - DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SinkWriter, SinkWriterParam, - SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; +const S3_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; + #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct S3Common { #[serde(rename = "s3.region_name")] pub region_name: String, #[serde(rename = "s3.bucket_name")] pub bucket_name: String, - #[serde(rename = "match_pattern", default)] - pub match_pattern: Option, + #[serde(rename = "s3.path", default)] + pub path: String, #[serde(rename = "s3.credentials.access", default)] pub access: Option, #[serde(rename = "s3.credentials.secret", default)] @@ -58,6 +48,7 @@ pub struct S3Common { pub endpoint_url: Option, #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, + } #[serde_as] @@ -111,40 +102,7 @@ impl S3Sink { } } -impl Sink for S3Sink { - type Coordinator = DummySinkCommitCoordinator; - type LogSinker = LogSinkerOf; - - const SINK_NAME: &'static str = S3_SINK; - - async fn validate(&self) -> Result<()> { - todo!() - } - - async fn new_log_sinker( - &self, - writer_param: crate::sink::SinkWriterParam, - ) -> Result { - todo!() - } -} - -impl TryFrom for S3Sink { - type Error = SinkError; - - fn try_from(param: SinkParam) -> std::result::Result { - let schema = param.schema(); - let config = S3Config::from_hashmap(param.properties)?; - S3Sink::new( - config, - schema, - param.downstream_pk, - param.sink_type.is_append_only(), - ) - } -} - -impl OpenDalSinkWriter { +impl S3Sink { pub async fn new_s3_sink(config: S3Config) -> Result { // Create s3 builder. let mut builder = S3::default(); @@ -179,8 +137,7 @@ impl OpenDalSinkWriter { builder.role_arn(&assume_role); } - let operator: Operator = Operator::new(builder) - .map_err(|e| SinkError::Connector(e.into()))? + let operator: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) .finish(); @@ -188,3 +145,50 @@ impl OpenDalSinkWriter { Ok(operator) } } + +impl Sink for S3Sink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = S3_SINK; + + async fn validate(&self) -> Result<()> { + let op = Self::new_s3_sink(self.config.clone()).await?; + Ok(()) + } + + async fn new_log_sinker( + &self, + writer_param: crate::sink::SinkWriterParam, + ) -> Result { + let op = Self::new_s3_sink(self.config.clone()).await?; + let path = self.config.common.path.as_ref(); + let writer = op + .writer_with(&path) + .concurrent(8) + .buffer(S3_WRITE_BUFFER_SIZE) + .await?; + Ok(OpenDalSinkWriter::new( + writer, + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + )? + .into_log_sinker(writer_param.sink_metrics)) + } +} + +impl TryFrom for S3Sink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = S3Config::from_hashmap(param.properties)?; + S3Sink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +} From 0b471174de2d2934de4c1dd77e5d6edd358f9137 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Tue, 12 Mar 2024 16:01:35 +0800 Subject: [PATCH 4/9] make clippy happy --- src/connector/src/sink/mod.rs | 2 +- src/connector/src/sink/opendal_sink/gcs.rs | 9 ++++----- src/connector/src/sink/opendal_sink/mod.rs | 4 ++-- src/connector/src/sink/opendal_sink/s3.rs | 10 ++++------ 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 0137bf5dc7b8..5fd3d57d0742 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -91,7 +91,7 @@ macro_rules! for_all_sinks { { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, { S3, $crate::sink::opendal_sink::s3::S3Sink }, - { GCS, $crate::sink::opendal_sink::gcs::GcsSink }, + { Gcs, $crate::sink::opendal_sink::gcs::GcsSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, diff --git a/src/connector/src/sink/opendal_sink/gcs.rs b/src/connector/src/sink/opendal_sink/gcs.rs index 03b89d1fcfdf..360e9bdce848 100644 --- a/src/connector/src/sink/opendal_sink/gcs.rs +++ b/src/connector/src/sink/opendal_sink/gcs.rs @@ -22,7 +22,6 @@ use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::encoder::RowEncoder; use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ @@ -103,7 +102,7 @@ impl GcsSink { } impl GcsSink { - pub async fn new_gcs_sink(config: GcsConfig) -> Result { + pub fn new_gcs_sink(config: GcsConfig) -> Result { // Create gcs builder. let mut builder = Gcs::default(); @@ -137,7 +136,7 @@ impl Sink for GcsSink { const SINK_NAME: &'static str = GCS_SINK; async fn validate(&self) -> Result<()> { - let op = Self::new_gcs_sink(self.config.clone()).await?; + let _op = Self::new_gcs_sink(self.config.clone())?; Ok(()) } @@ -145,10 +144,10 @@ impl Sink for GcsSink { &self, writer_param: crate::sink::SinkWriterParam, ) -> Result { - let op = Self::new_gcs_sink(self.config.clone()).await?; + let op = Self::new_gcs_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); let writer = op - .writer_with(&path) + .writer_with(path) .concurrent(8) .buffer(GCS_WRITE_BUFFER_SIZE) .await?; diff --git a/src/connector/src/sink/opendal_sink/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs index 2c20d1fe53da..0b68b0956a78 100644 --- a/src/connector/src/sink/opendal_sink/mod.rs +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -19,14 +19,14 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; -use opendal::{Operator, Writer}; +use opendal::Writer; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use serde_json::Value; use crate::sink::encoder::{JsonEncoder, RowEncoder}; -use crate::sink::{Result, SinkError, SinkWriter}; +use crate::sink::{Result, SinkWriter}; pub const GCS_SINK: &str = "gcs"; diff --git a/src/connector/src/sink/opendal_sink/s3.rs b/src/connector/src/sink/opendal_sink/s3.rs index b69cb484b6e0..7b0f2422f14c 100644 --- a/src/connector/src/sink/opendal_sink/s3.rs +++ b/src/connector/src/sink/opendal_sink/s3.rs @@ -22,7 +22,6 @@ use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::encoder::RowEncoder; use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ @@ -48,7 +47,6 @@ pub struct S3Common { pub endpoint_url: Option, #[serde(rename = "s3.assume_role", default)] pub assume_role: Option, - } #[serde_as] @@ -103,7 +101,7 @@ impl S3Sink { } impl S3Sink { - pub async fn new_s3_sink(config: S3Config) -> Result { + pub fn new_s3_sink(config: S3Config) -> Result { // Create s3 builder. let mut builder = S3::default(); builder.bucket(&config.common.bucket_name); @@ -153,7 +151,7 @@ impl Sink for S3Sink { const SINK_NAME: &'static str = S3_SINK; async fn validate(&self) -> Result<()> { - let op = Self::new_s3_sink(self.config.clone()).await?; + let _op = Self::new_s3_sink(self.config.clone())?; Ok(()) } @@ -161,10 +159,10 @@ impl Sink for S3Sink { &self, writer_param: crate::sink::SinkWriterParam, ) -> Result { - let op = Self::new_s3_sink(self.config.clone()).await?; + let op = Self::new_s3_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); let writer = op - .writer_with(&path) + .writer_with(path) .concurrent(8) .buffer(S3_WRITE_BUFFER_SIZE) .await?; From ad6f3a34986d301b4da32075501a768d4e03a65f Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 13 Mar 2024 17:31:52 +0800 Subject: [PATCH 5/9] save work, add parquet writer --- Cargo.lock | 1 + src/connector/Cargo.toml | 1 + src/connector/src/sink/mod.rs | 13 ++++++ src/connector/src/sink/opendal_sink/gcs.rs | 18 +++++++-- src/connector/src/sink/opendal_sink/mod.rs | 46 +++++++++++----------- src/connector/src/sink/opendal_sink/s3.rs | 19 +++++++-- 6 files changed, 67 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 928de9d3f645..859cc20c87c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9228,6 +9228,7 @@ dependencies = [ "num-bigint", "opendal", "parking_lot 0.12.1", + "parquet 50.0.0", "paste", "pretty_assertions", "prometheus", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 65ead4ce3230..b1febda7078f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -84,6 +84,7 @@ nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" opendal = "0.44.2" parking_lot = "0.12" +parquet = "50.0.0" paste = "1" prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.12", features = ["no-recursion-limit"] } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 5fd3d57d0742..0d8975683d00 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -48,6 +48,7 @@ use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; use opendal::Error as OpendalError; +use risingwave_common::array::ArrayError; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::metrics::{ @@ -569,6 +570,18 @@ impl From for SinkError { } } +impl From for SinkError { + fn from(error: parquet::errors::ParquetError) -> Self { + SinkError::Opendal(error.to_string()) + } +} + +impl From for SinkError { + fn from(error: ArrayError) -> Self { + SinkError::Opendal(error.to_string()) + } +} + impl From for SinkError { fn from(value: RpcError) -> Self { SinkError::Remote(anyhow!(value)) diff --git a/src/connector/src/sink/opendal_sink/gcs.rs b/src/connector/src/sink/opendal_sink/gcs.rs index 360e9bdce848..0dd2d0479f2e 100644 --- a/src/connector/src/sink/opendal_sink/gcs.rs +++ b/src/connector/src/sink/opendal_sink/gcs.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use anyhow::anyhow; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::Gcs; -use opendal::Operator; +use opendal::{Operator, Writer as OpendalWriter}; +use parquet::arrow::async_writer::AsyncArrowWriter; use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::opendal_sink::OpenDalSinkWriter; +use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, @@ -146,13 +148,21 @@ impl Sink for GcsSink { ) -> Result { let op = Self::new_gcs_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); - let writer = op + let gcs_writer = op .writer_with(path) .concurrent(8) .buffer(GCS_WRITE_BUFFER_SIZE) .await?; + + let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); + let sink_writer: AsyncArrowWriter = AsyncArrowWriter::try_new( + gcs_writer, + Arc::new(arrow_schema), + GCS_WRITE_BUFFER_SIZE, + None, + )?; Ok(OpenDalSinkWriter::new( - writer, + sink_writer, self.schema.clone(), self.pk_indices.clone(), self.is_append_only, diff --git a/src/connector/src/sink/opendal_sink/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs index 0b68b0956a78..f06b5bbca0a3 100644 --- a/src/connector/src/sink/opendal_sink/mod.rs +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -14,13 +14,16 @@ pub mod gcs; pub mod s3; - use std::collections::HashMap; use std::sync::Arc; +use arrow_schema::{ + DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, +}; use async_trait::async_trait; -use opendal::Writer; -use risingwave_common::array::{Op, StreamChunk}; +use opendal::Writer as OpendalWriter; +use parquet::arrow::async_writer::AsyncArrowWriter; +use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use serde_json::Value; @@ -32,10 +35,9 @@ pub const GCS_SINK: &str = "gcs"; pub struct OpenDalSinkWriter { schema: Schema, - writer: Writer, + writer: AsyncArrowWriter, pk_indices: Vec, is_append_only: bool, - row_encoder: JsonEncoder, } #[async_trait] @@ -53,19 +55,12 @@ impl SinkWriter for OpenDalSinkWriter { } async fn abort(&mut self) -> Result<()> { - self.writer.abort().await?; Ok(()) } async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { if is_checkpoint { - match self.writer.close().await { - Ok(_) => (), - Err(err) => { - self.writer.abort().await?; - return Err(err.into()); - } - }; + todo!() } Ok(()) @@ -78,29 +73,34 @@ impl SinkWriter for OpenDalSinkWriter { impl OpenDalSinkWriter { pub fn new( - writer: Writer, + writer: AsyncArrowWriter, schema: Schema, pk_indices: Vec, is_append_only: bool, ) -> Result { - let decimal_map = HashMap::default(); Ok(Self { schema: schema.clone(), pk_indices, writer, is_append_only, - row_encoder: JsonEncoder::new_with_s3(schema, None, decimal_map), }) } async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } - let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string(); - self.writer.write(row_json_string).await?; - } + let (mut chunk, ops) = chunk.compact().into_parts(); + let filters = + chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::(); + chunk.set_visibility(filters); + let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); + let batch = to_record_batch_with_schema(Arc::new(arrow_schema), &chunk.compact())?; + self.writer.write(&batch).await?; + Ok(()) } } + +fn change_schema_to_arrow_schema( + schema: risingwave_common::catalog::Schema, +) -> arrow_schema::Schema { + todo!() +} diff --git a/src/connector/src/sink/opendal_sink/s3.rs b/src/connector/src/sink/opendal_sink/s3.rs index 7b0f2422f14c..ec60ef2ab16c 100644 --- a/src/connector/src/sink/opendal_sink/s3.rs +++ b/src/connector/src/sink/opendal_sink/s3.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use anyhow::anyhow; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::S3; -use opendal::Operator; +use opendal::{Operator, Writer as OpendalWriter}; +use parquet::arrow::async_writer::AsyncArrowWriter; use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::opendal_sink::OpenDalSinkWriter; +use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter}; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, @@ -161,13 +163,22 @@ impl Sink for S3Sink { ) -> Result { let op = Self::new_s3_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); - let writer = op + let s3_writer = op .writer_with(path) .concurrent(8) .buffer(S3_WRITE_BUFFER_SIZE) .await?; + + let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); + let sink_writer: AsyncArrowWriter = AsyncArrowWriter::try_new( + s3_writer, + Arc::new(arrow_schema), + S3_WRITE_BUFFER_SIZE, + None, + )?; + Ok(OpenDalSinkWriter::new( - writer, + sink_writer, self.schema.clone(), self.pk_indices.clone(), self.is_append_only, From 08d05f071b0665cc2b2984b89f5d39ffd87c59a4 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Wed, 13 Mar 2024 17:33:14 +0800 Subject: [PATCH 6/9] minor --- src/connector/src/sink/opendal_sink/mod.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/connector/src/sink/opendal_sink/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs index f06b5bbca0a3..5a9099ce33cc 100644 --- a/src/connector/src/sink/opendal_sink/mod.rs +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -14,21 +14,19 @@ pub mod gcs; pub mod s3; -use std::collections::HashMap; + use std::sync::Arc; -use arrow_schema::{ - DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, -}; + use async_trait::async_trait; use opendal::Writer as OpendalWriter; use parquet::arrow::async_writer::AsyncArrowWriter; use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; -use serde_json::Value; -use crate::sink::encoder::{JsonEncoder, RowEncoder}; + +use crate::sink::encoder::{RowEncoder}; use crate::sink::{Result, SinkWriter}; pub const GCS_SINK: &str = "gcs"; @@ -100,7 +98,7 @@ impl OpenDalSinkWriter { } fn change_schema_to_arrow_schema( - schema: risingwave_common::catalog::Schema, + _schema: risingwave_common::catalog::Schema, ) -> arrow_schema::Schema { todo!() } From f4618c10d2a2abaa6ad18e1e8d7635e6d950b2ff Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 14 Mar 2024 16:14:16 +0800 Subject: [PATCH 7/9] add parquet writer, todo: add e2e test and comments --- src/connector/src/sink/opendal_sink/gcs.rs | 23 +---- src/connector/src/sink/opendal_sink/mod.rs | 107 ++++++++++++++++----- src/connector/src/sink/opendal_sink/s3.rs | 25 +---- 3 files changed, 94 insertions(+), 61 deletions(-) diff --git a/src/connector/src/sink/opendal_sink/gcs.rs b/src/connector/src/sink/opendal_sink/gcs.rs index 0dd2d0479f2e..2715fde48a0a 100644 --- a/src/connector/src/sink/opendal_sink/gcs.rs +++ b/src/connector/src/sink/opendal_sink/gcs.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use anyhow::anyhow; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::Gcs; -use opendal::{Operator, Writer as OpendalWriter}; -use parquet::arrow::async_writer::AsyncArrowWriter; +use opendal::Operator; use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter}; +use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, @@ -148,21 +146,10 @@ impl Sink for GcsSink { ) -> Result { let op = Self::new_gcs_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); - let gcs_writer = op - .writer_with(path) - .concurrent(8) - .buffer(GCS_WRITE_BUFFER_SIZE) - .await?; - - let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); - let sink_writer: AsyncArrowWriter = AsyncArrowWriter::try_new( - gcs_writer, - Arc::new(arrow_schema), - GCS_WRITE_BUFFER_SIZE, - None, - )?; + Ok(OpenDalSinkWriter::new( - sink_writer, + op, + path, self.schema.clone(), self.pk_indices.clone(), self.is_append_only, diff --git a/src/connector/src/sink/opendal_sink/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs index 5a9099ce33cc..8415d9de84c7 100644 --- a/src/connector/src/sink/opendal_sink/mod.rs +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -14,33 +14,39 @@ pub mod gcs; pub mod s3; - +use std::collections::HashMap; use std::sync::Arc; - +use anyhow::anyhow; +use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, SchemaRef}; use async_trait::async_trait; -use opendal::Writer as OpendalWriter; -use parquet::arrow::async_writer::AsyncArrowWriter; +use icelake::config::ParquetWriterConfig; +use opendal::{Operator, Writer as OpendalWriter}; +use parquet::arrow::AsyncArrowWriter; +use parquet::file::properties::{WriterProperties, WriterVersion}; use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use crate::sink::{Result, SinkError, SinkWriter}; -use crate::sink::encoder::{RowEncoder}; -use crate::sink::{Result, SinkWriter}; - -pub const GCS_SINK: &str = "gcs"; +const SINK_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct OpenDalSinkWriter { - schema: Schema, - writer: AsyncArrowWriter, + schema: SchemaRef, + operator: Operator, + sink_writer: Option>, pk_indices: Vec, is_append_only: bool, + write_path: String, } #[async_trait] impl SinkWriter for OpenDalSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.sink_writer.is_none() { + self.create_sink_writer().await?; + } if self.is_append_only { self.append_only(chunk).await } else { @@ -58,7 +64,11 @@ impl SinkWriter for OpenDalSinkWriter { async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { if is_checkpoint { - todo!() + let sink_writer = self + .sink_writer + .take() + .ok_or_else(|| SinkError::Opendal("Can't get sink writer".to_string()))?; + sink_writer.close().await?; } Ok(()) @@ -71,34 +81,87 @@ impl SinkWriter for OpenDalSinkWriter { impl OpenDalSinkWriter { pub fn new( - writer: AsyncArrowWriter, - schema: Schema, + operator: Operator, + write_path: &str, + rw_schema: Schema, pk_indices: Vec, is_append_only: bool, ) -> Result { + let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?; Ok(Self { - schema: schema.clone(), + schema: Arc::new(arrow_schema), + write_path: write_path.to_string(), pk_indices, - writer, + operator, + sink_writer: None, is_append_only, }) } + async fn create_sink_writer(&mut self) -> Result<()> { + let object_store_writer = self + .operator + .writer_with(&self.write_path) + .concurrent(8) + .buffer(SINK_WRITE_BUFFER_SIZE) + .await?; + let parquet_config = ParquetWriterConfig::default(); + let mut props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_bloom_filter_enabled(parquet_config.enable_bloom_filter) + .set_compression(parquet_config.compression) + .set_max_row_group_size(parquet_config.max_row_group_size) + .set_write_batch_size(parquet_config.write_batch_size) + .set_data_page_size_limit(parquet_config.data_page_size); + if let Some(created_by) = parquet_config.created_by.as_ref() { + props = props.set_created_by(created_by.to_string()); + } + self.sink_writer = Some(AsyncArrowWriter::try_new( + object_store_writer, + self.schema.clone(), + SINK_WRITE_BUFFER_SIZE, + Some(props.build()), + )?); + Ok(()) + } + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { let (mut chunk, ops) = chunk.compact().into_parts(); let filters = chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::(); chunk.set_visibility(filters); - let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); - let batch = to_record_batch_with_schema(Arc::new(arrow_schema), &chunk.compact())?; - self.writer.write(&batch).await?; + + let batch = to_record_batch_with_schema(self.schema.clone(), &chunk.compact())?; + + self.sink_writer + .as_mut() + .ok_or_else(|| SinkError::Opendal("Sink writer is not created.".to_string()))? + .write(&batch) + .await?; Ok(()) } } -fn change_schema_to_arrow_schema( - _schema: risingwave_common::catalog::Schema, -) -> arrow_schema::Schema { - todo!() +fn convert_rw_schema_to_arrow_schema( + rw_schema: risingwave_common::catalog::Schema, +) -> anyhow::Result { + let mut schema_fields = HashMap::new(); + rw_schema.fields.iter().for_each(|field| { + let res = schema_fields.insert(&field.name, &field.data_type); + // This assert is to make sure there is no duplicate field name in the schema. + assert!(res.is_none()) + }); + let mut arrow_fileds = vec![]; + for rw_field in &rw_schema.fields { + let converted_arrow_data_type = + ArrowDataType::try_from(rw_field.data_type.clone()).map_err(|e| anyhow!(e))?; + arrow_fileds.push(ArrowField::new( + rw_field.name.clone(), + converted_arrow_data_type, + false, + )); + } + + Ok(arrow_schema::Schema::new(arrow_fileds)) } diff --git a/src/connector/src/sink/opendal_sink/s3.rs b/src/connector/src/sink/opendal_sink/s3.rs index ec60ef2ab16c..dc0a19fe9273 100644 --- a/src/connector/src/sink/opendal_sink/s3.rs +++ b/src/connector/src/sink/opendal_sink/s3.rs @@ -12,27 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use anyhow::anyhow; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::S3; -use opendal::{Operator, Writer as OpendalWriter}; -use parquet::arrow::async_writer::AsyncArrowWriter; +use opendal::Operator; use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter}; +use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -const S3_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; - #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct S3Common { #[serde(rename = "s3.region_name")] @@ -163,22 +159,9 @@ impl Sink for S3Sink { ) -> Result { let op = Self::new_s3_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); - let s3_writer = op - .writer_with(path) - .concurrent(8) - .buffer(S3_WRITE_BUFFER_SIZE) - .await?; - - let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); - let sink_writer: AsyncArrowWriter = AsyncArrowWriter::try_new( - s3_writer, - Arc::new(arrow_schema), - S3_WRITE_BUFFER_SIZE, - None, - )?; - Ok(OpenDalSinkWriter::new( - sink_writer, + op, + path, self.schema.clone(), self.pk_indices.clone(), self.is_append_only, From 31a80520a977c08c76218fdc2c6d41a09501973a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 14 Mar 2024 16:20:06 +0800 Subject: [PATCH 8/9] minor --- src/connector/src/sink/encoder/json.rs | 32 +++----------------------- src/connector/src/sink/encoder/mod.rs | 1 - 2 files changed, 3 insertions(+), 30 deletions(-) diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 13fce6f5cfe7..64a06ff70770 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -114,23 +114,6 @@ impl JsonEncoder { } } - pub fn new_with_s3( - schema: Schema, - col_indices: Option>, - _map: HashMap, - ) -> Self { - Self { - schema, - col_indices, - time_handling_mode: TimeHandlingMode::Milli, - date_handling_mode: DateHandlingMode::String, - timestamp_handling_mode: TimestampHandlingMode::String, - timestamptz_handling_mode: TimestamptzHandlingMode::UtcWithoutSuffix, - custom_json_type: CustomJsonType::S3, - kafka_connect: None, - } - } - pub fn new_with_bigquery(schema: Schema, col_indices: Option>) -> Self { Self { schema, @@ -276,10 +259,7 @@ fn datum_to_json_object( } json!(v_string) } - CustomJsonType::Es - | CustomJsonType::None - | CustomJsonType::BigQuery - | CustomJsonType::S3 => { + CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => { json!(v.to_text()) } }, @@ -331,10 +311,7 @@ fn datum_to_json_object( } (DataType::Jsonb, ScalarRefImpl::Jsonb(jsonb_ref)) => match custom_json_type { CustomJsonType::Es | CustomJsonType::StarRocks(_) => JsonbVal::from(jsonb_ref).take(), - CustomJsonType::Doris(_) - | CustomJsonType::None - | CustomJsonType::BigQuery - | CustomJsonType::S3 => { + CustomJsonType::Doris(_) | CustomJsonType::None | CustomJsonType::BigQuery => { json!(jsonb_ref.to_string()) } }, @@ -385,10 +362,7 @@ fn datum_to_json_object( "starrocks can't support struct".to_string(), )); } - CustomJsonType::Es - | CustomJsonType::None - | CustomJsonType::BigQuery - | CustomJsonType::S3 => { + CustomJsonType::Es | CustomJsonType::None | CustomJsonType::BigQuery => { let mut map = Map::with_capacity(st.len()); for (sub_datum_ref, sub_field) in struct_ref.iter_fields_ref().zip_eq_debug( st.iter() diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 97b2cb1f8eb9..3254447e2707 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -144,7 +144,6 @@ pub enum CustomJsonType { Es, // starrocks' need jsonb is struct StarRocks(HashMap), - S3, // bigquery need null array -> [] BigQuery, None, From d1b61a9d5e9d10e8efd65bc46265be37888b138a Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 14 Mar 2024 17:11:07 +0800 Subject: [PATCH 9/9] fix typo --- src/connector/src/sink/opendal_sink/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/connector/src/sink/opendal_sink/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs index 8415d9de84c7..723450cce164 100644 --- a/src/connector/src/sink/opendal_sink/mod.rs +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -152,16 +152,16 @@ fn convert_rw_schema_to_arrow_schema( // This assert is to make sure there is no duplicate field name in the schema. assert!(res.is_none()) }); - let mut arrow_fileds = vec![]; + let mut arrow_fields = vec![]; for rw_field in &rw_schema.fields { let converted_arrow_data_type = ArrowDataType::try_from(rw_field.data_type.clone()).map_err(|e| anyhow!(e))?; - arrow_fileds.push(ArrowField::new( + arrow_fields.push(ArrowField::new( rw_field.name.clone(), converted_arrow_data_type, false, )); } - Ok(arrow_schema::Schema::new(arrow_fileds)) + Ok(arrow_schema::Schema::new(arrow_fields)) }