diff --git a/Cargo.lock b/Cargo.lock index bf8d3c00df2d5..859cc20c87c7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,17 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + [[package]] name = "ahash" version = "0.7.7" @@ -1647,6 +1658,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-padding" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.3.1" @@ -1932,6 +1952,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" +[[package]] +name = "cbc" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6" +dependencies = [ + "cipher", +] + [[package]] name = "cc" version = "1.0.83" @@ -2046,6 +2075,16 @@ dependencies = [ "half 1.8.2", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -5232,6 +5271,16 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "block-padding", + "generic-array", +] + [[package]] name = "inquire" version = "0.7.0" @@ -7265,6 +7314,16 @@ dependencies = [ "prost-types 0.11.9", ] +[[package]] +name = "pbkdf2" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -7437,6 +7496,21 @@ dependencies = [ "spki 0.7.2", ] +[[package]] +name = "pkcs5" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6" +dependencies = [ + "aes", + "cbc", + "der 0.7.8", + "pbkdf2", + "scrypt", + "sha2", + "spki 0.7.2", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -7454,6 +7528,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ "der 0.7.8", + "pkcs5", + "rand_core", "spki 0.7.2", ] @@ -8469,9 +8545,9 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.14.6" +version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd" +checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5" dependencies = [ "anyhow", "async-trait", @@ -8496,7 +8572,6 @@ dependencies = [ "serde_json", "sha1", "sha2", - "tokio", ] [[package]] @@ -9153,6 +9228,7 @@ dependencies = [ "num-bigint", "opendal", "parking_lot 0.12.1", + "parquet 50.0.0", "paste", "pretty_assertions", "prometheus", @@ -10313,6 +10389,7 @@ dependencies = [ "pkcs1", "pkcs8 0.10.2", "rand_core", + "sha2", "signature 2.0.0", "spki 0.7.2", "subtle", @@ -10545,6 +10622,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "salsa20" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213" +dependencies = [ + "cipher", +] + [[package]] name = "same-file" version = "1.0.6" @@ -10611,6 +10697,17 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152" +[[package]] +name = "scrypt" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" +dependencies = [ + "pbkdf2", + "salsa20", + "sha2", +] + [[package]] name = "sct" version = "0.7.0" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 77f2c1374dc92..b1febda7078f5 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -82,8 +82,9 @@ mysql_common = { version = "0.31", default-features = false, features = [ ] } nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" -opendal = "0.44" +opendal = "0.44.2" parking_lot = "0.12" +parquet = "50.0.0" paste = "1" prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.12", features = ["no-recursion-limit"] } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 7daf0883ac7e8..0d8975683d00f 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -29,6 +29,7 @@ pub mod kinesis; pub mod log_store; pub mod mock_coordination_client; pub mod nats; +pub mod opendal_sink; pub mod pulsar; pub mod redis; pub mod remote; @@ -46,6 +47,8 @@ use ::deltalake::DeltaTableError; use ::redis::RedisError; use anyhow::anyhow; use async_trait::async_trait; +use opendal::Error as OpendalError; +use risingwave_common::array::ArrayError; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::metrics::{ @@ -88,6 +91,8 @@ macro_rules! for_all_sinks { { HttpJava, $crate::sink::remote::HttpJavaSink }, { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, + { S3, $crate::sink::opendal_sink::s3::S3Sink }, + { Gcs, $crate::sink::opendal_sink::gcs::GcsSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, { BigQuery, $crate::sink::big_query::BigQuerySink }, { Test, $crate::sink::test_sink::TestSink }, @@ -525,6 +530,8 @@ pub enum SinkError { ), #[error("Starrocks error: {0}")] Starrocks(String), + #[error("Opendal error: {0}")] + Opendal(String), #[error("Pulsar error: {0}")] Pulsar( #[source] @@ -557,6 +564,24 @@ impl From for SinkError { } } +impl From for SinkError { + fn from(error: OpendalError) -> Self { + SinkError::Opendal(error.to_string()) + } +} + +impl From for SinkError { + fn from(error: parquet::errors::ParquetError) -> Self { + SinkError::Opendal(error.to_string()) + } +} + +impl From for SinkError { + fn from(error: ArrayError) -> Self { + SinkError::Opendal(error.to_string()) + } +} + impl From for SinkError { fn from(value: RpcError) -> Self { SinkError::Remote(anyhow!(value)) diff --git a/src/connector/src/sink/opendal_sink/gcs.rs b/src/connector/src/sink/opendal_sink/gcs.rs new file mode 100644 index 0000000000000..2715fde48a0ae --- /dev/null +++ b/src/connector/src/sink/opendal_sink/gcs.rs @@ -0,0 +1,174 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::HashMap; + +use anyhow::anyhow; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Gcs; +use opendal::Operator; +use risingwave_common::catalog::Schema; +use serde::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use crate::sink::opendal_sink::OpenDalSinkWriter; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; + +const GCS_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct GcsCommon { + #[serde(rename = "gcs.bucket_name")] + pub bucket_name: String, + + /// The base64 encoded credential key. If not set, ADC will be used. + #[serde(rename = "gcs.credential")] + pub credential: Option, + + /// If credential/ADC is not set. The service account can be used to provide the credential info. + #[serde(rename = "gcs.service_account", default)] + pub service_account: Option, + + #[serde(rename = "gcs.path", default)] + pub path: String, + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct GcsConfig { + #[serde(flatten)] + pub common: GcsCommon, + + pub r#type: String, // accept "append-only" or "upsert" +} + +pub const GCS_SINK: &str = "gcs"; + +impl GcsConfig { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct GcsSink { + pub config: GcsConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl GcsSink { + pub fn new( + config: GcsConfig, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + +impl GcsSink { + pub fn new_gcs_sink(config: GcsConfig) -> Result { + // Create gcs builder. + let mut builder = Gcs::default(); + + builder.bucket(&config.common.bucket_name); + + // if credential env is set, use it. Otherwise, ADC will be used. + if let Some(cred) = config.common.credential { + builder.credential(&cred); + } else { + let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS"); + if let Ok(cred) = cred { + builder.credential(&cred); + } + } + + if let Some(service_account) = config.common.service_account { + builder.service_account(&service_account); + } + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + Ok(operator) + } +} + +impl Sink for GcsSink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = GCS_SINK; + + async fn validate(&self) -> Result<()> { + let _op = Self::new_gcs_sink(self.config.clone())?; + Ok(()) + } + + async fn new_log_sinker( + &self, + writer_param: crate::sink::SinkWriterParam, + ) -> Result { + let op = Self::new_gcs_sink(self.config.clone())?; + let path = self.config.common.path.as_ref(); + + Ok(OpenDalSinkWriter::new( + op, + path, + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + )? + .into_log_sinker(writer_param.sink_metrics)) + } +} + +impl TryFrom for GcsSink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = GcsConfig::from_hashmap(param.properties)?; + GcsSink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +} diff --git a/src/connector/src/sink/opendal_sink/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs new file mode 100644 index 0000000000000..723450cce1645 --- /dev/null +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -0,0 +1,167 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod gcs; +pub mod s3; +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::anyhow; +use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, SchemaRef}; +use async_trait::async_trait; +use icelake::config::ParquetWriterConfig; +use opendal::{Operator, Writer as OpendalWriter}; +use parquet::arrow::AsyncArrowWriter; +use parquet::file::properties::{WriterProperties, WriterVersion}; +use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; +use risingwave_common::buffer::Bitmap; +use risingwave_common::catalog::Schema; + +use crate::sink::{Result, SinkError, SinkWriter}; + +const SINK_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; + +pub struct OpenDalSinkWriter { + schema: SchemaRef, + operator: Operator, + sink_writer: Option>, + pk_indices: Vec, + is_append_only: bool, + write_path: String, +} + +#[async_trait] +impl SinkWriter for OpenDalSinkWriter { + async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.sink_writer.is_none() { + self.create_sink_writer().await?; + } + if self.is_append_only { + self.append_only(chunk).await + } else { + unimplemented!() + } + } + + async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + Ok(()) + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { + if is_checkpoint { + let sink_writer = self + .sink_writer + .take() + .ok_or_else(|| SinkError::Opendal("Can't get sink writer".to_string()))?; + sink_writer.close().await?; + } + + Ok(()) + } + + async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc) -> Result<()> { + Ok(()) + } +} + +impl OpenDalSinkWriter { + pub fn new( + operator: Operator, + write_path: &str, + rw_schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?; + Ok(Self { + schema: Arc::new(arrow_schema), + write_path: write_path.to_string(), + pk_indices, + operator, + sink_writer: None, + is_append_only, + }) + } + + async fn create_sink_writer(&mut self) -> Result<()> { + let object_store_writer = self + .operator + .writer_with(&self.write_path) + .concurrent(8) + .buffer(SINK_WRITE_BUFFER_SIZE) + .await?; + let parquet_config = ParquetWriterConfig::default(); + let mut props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_bloom_filter_enabled(parquet_config.enable_bloom_filter) + .set_compression(parquet_config.compression) + .set_max_row_group_size(parquet_config.max_row_group_size) + .set_write_batch_size(parquet_config.write_batch_size) + .set_data_page_size_limit(parquet_config.data_page_size); + if let Some(created_by) = parquet_config.created_by.as_ref() { + props = props.set_created_by(created_by.to_string()); + } + self.sink_writer = Some(AsyncArrowWriter::try_new( + object_store_writer, + self.schema.clone(), + SINK_WRITE_BUFFER_SIZE, + Some(props.build()), + )?); + Ok(()) + } + + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + let (mut chunk, ops) = chunk.compact().into_parts(); + let filters = + chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::(); + chunk.set_visibility(filters); + + let batch = to_record_batch_with_schema(self.schema.clone(), &chunk.compact())?; + + self.sink_writer + .as_mut() + .ok_or_else(|| SinkError::Opendal("Sink writer is not created.".to_string()))? + .write(&batch) + .await?; + + Ok(()) + } +} + +fn convert_rw_schema_to_arrow_schema( + rw_schema: risingwave_common::catalog::Schema, +) -> anyhow::Result { + let mut schema_fields = HashMap::new(); + rw_schema.fields.iter().for_each(|field| { + let res = schema_fields.insert(&field.name, &field.data_type); + // This assert is to make sure there is no duplicate field name in the schema. + assert!(res.is_none()) + }); + let mut arrow_fields = vec![]; + for rw_field in &rw_schema.fields { + let converted_arrow_data_type = + ArrowDataType::try_from(rw_field.data_type.clone()).map_err(|e| anyhow!(e))?; + arrow_fields.push(ArrowField::new( + rw_field.name.clone(), + converted_arrow_data_type, + false, + )); + } + + Ok(arrow_schema::Schema::new(arrow_fields)) +} diff --git a/src/connector/src/sink/opendal_sink/s3.rs b/src/connector/src/sink/opendal_sink/s3.rs new file mode 100644 index 0000000000000..dc0a19fe92731 --- /dev/null +++ b/src/connector/src/sink/opendal_sink/s3.rs @@ -0,0 +1,186 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::HashMap; + +use anyhow::anyhow; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; +use opendal::Operator; +use risingwave_common::catalog::Schema; +use serde::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use crate::sink::opendal_sink::OpenDalSinkWriter; +use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; +use crate::sink::{ + DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, + SINK_TYPE_OPTION, SINK_TYPE_UPSERT, +}; + +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct S3Common { + #[serde(rename = "s3.region_name")] + pub region_name: String, + #[serde(rename = "s3.bucket_name")] + pub bucket_name: String, + #[serde(rename = "s3.path", default)] + pub path: String, + #[serde(rename = "s3.credentials.access", default)] + pub access: Option, + #[serde(rename = "s3.credentials.secret", default)] + pub secret: Option, + #[serde(rename = "s3.endpoint_url")] + pub endpoint_url: Option, + #[serde(rename = "s3.assume_role", default)] + pub assume_role: Option, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct S3Config { + #[serde(flatten)] + pub common: S3Common, + + pub r#type: String, // accept "append-only" or "upsert" +} + +pub const S3_SINK: &str = "s3"; + +impl S3Config { + pub fn from_hashmap(properties: HashMap) -> Result { + let config = serde_json::from_value::(serde_json::to_value(properties).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } +} + +#[derive(Debug)] +pub struct S3Sink { + pub config: S3Config, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, +} + +impl S3Sink { + pub fn new( + config: S3Config, + schema: Schema, + pk_indices: Vec, + is_append_only: bool, + ) -> Result { + Ok(Self { + config, + schema, + pk_indices, + is_append_only, + }) + } +} + +impl S3Sink { + pub fn new_s3_sink(config: S3Config) -> Result { + // Create s3 builder. + let mut builder = S3::default(); + builder.bucket(&config.common.bucket_name); + builder.region(&config.common.region_name); + + if let Some(endpoint_url) = config.common.endpoint_url { + builder.endpoint(&endpoint_url); + } + + if let Some(access) = config.common.access { + builder.access_key_id(&access); + } else { + tracing::error!( + "access key id of aws s3 is not set, bucket {}", + config.common.bucket_name + ); + } + + if let Some(secret) = config.common.secret { + builder.secret_access_key(&secret); + } else { + tracing::error!( + "secret access key of aws s3 is not set, bucket {}", + config.common.bucket_name + ); + } + + builder.enable_virtual_host_style(); + + if let Some(assume_role) = config.common.assume_role { + builder.role_arn(&assume_role); + } + + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + Ok(operator) + } +} + +impl Sink for S3Sink { + type Coordinator = DummySinkCommitCoordinator; + type LogSinker = LogSinkerOf; + + const SINK_NAME: &'static str = S3_SINK; + + async fn validate(&self) -> Result<()> { + let _op = Self::new_s3_sink(self.config.clone())?; + Ok(()) + } + + async fn new_log_sinker( + &self, + writer_param: crate::sink::SinkWriterParam, + ) -> Result { + let op = Self::new_s3_sink(self.config.clone())?; + let path = self.config.common.path.as_ref(); + Ok(OpenDalSinkWriter::new( + op, + path, + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + )? + .into_log_sinker(writer_param.sink_metrics)) + } +} + +impl TryFrom for S3Sink { + type Error = SinkError; + + fn try_from(param: SinkParam) -> std::result::Result { + let schema = param.schema(); + let config = S3Config::from_hashmap(param.properties)?; + S3Sink::new( + config, + schema, + param.downstream_pk, + param.sink_type.is_append_only(), + ) + } +}