diff --git a/src/connector/src/sink/opendal_sink/gcs.rs b/src/connector/src/sink/opendal_sink/gcs.rs index 0dd2d0479f2e9..2715fde48a0ae 100644 --- a/src/connector/src/sink/opendal_sink/gcs.rs +++ b/src/connector/src/sink/opendal_sink/gcs.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use anyhow::anyhow; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::Gcs; -use opendal::{Operator, Writer as OpendalWriter}; -use parquet::arrow::async_writer::AsyncArrowWriter; +use opendal::Operator; use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter}; +use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, @@ -148,21 +146,10 @@ impl Sink for GcsSink { ) -> Result { let op = Self::new_gcs_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); - let gcs_writer = op - .writer_with(path) - .concurrent(8) - .buffer(GCS_WRITE_BUFFER_SIZE) - .await?; - - let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); - let sink_writer: AsyncArrowWriter = AsyncArrowWriter::try_new( - gcs_writer, - Arc::new(arrow_schema), - GCS_WRITE_BUFFER_SIZE, - None, - )?; + Ok(OpenDalSinkWriter::new( - sink_writer, + op, + path, self.schema.clone(), self.pk_indices.clone(), self.is_append_only, diff --git a/src/connector/src/sink/opendal_sink/mod.rs b/src/connector/src/sink/opendal_sink/mod.rs index 5a9099ce33ccb..8415d9de84c70 100644 --- a/src/connector/src/sink/opendal_sink/mod.rs +++ b/src/connector/src/sink/opendal_sink/mod.rs @@ -14,33 +14,39 @@ pub mod gcs; pub mod s3; - +use std::collections::HashMap; use std::sync::Arc; - +use anyhow::anyhow; +use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, SchemaRef}; use async_trait::async_trait; -use opendal::Writer as OpendalWriter; -use parquet::arrow::async_writer::AsyncArrowWriter; +use icelake::config::ParquetWriterConfig; +use opendal::{Operator, Writer as OpendalWriter}; +use parquet::arrow::AsyncArrowWriter; +use parquet::file::properties::{WriterProperties, WriterVersion}; use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; +use crate::sink::{Result, SinkError, SinkWriter}; -use crate::sink::encoder::{RowEncoder}; -use crate::sink::{Result, SinkWriter}; - -pub const GCS_SINK: &str = "gcs"; +const SINK_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; pub struct OpenDalSinkWriter { - schema: Schema, - writer: AsyncArrowWriter, + schema: SchemaRef, + operator: Operator, + sink_writer: Option>, pk_indices: Vec, is_append_only: bool, + write_path: String, } #[async_trait] impl SinkWriter for OpenDalSinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { + if self.sink_writer.is_none() { + self.create_sink_writer().await?; + } if self.is_append_only { self.append_only(chunk).await } else { @@ -58,7 +64,11 @@ impl SinkWriter for OpenDalSinkWriter { async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> { if is_checkpoint { - todo!() + let sink_writer = self + .sink_writer + .take() + .ok_or_else(|| SinkError::Opendal("Can't get sink writer".to_string()))?; + sink_writer.close().await?; } Ok(()) @@ -71,34 +81,87 @@ impl SinkWriter for OpenDalSinkWriter { impl OpenDalSinkWriter { pub fn new( - writer: AsyncArrowWriter, - schema: Schema, + operator: Operator, + write_path: &str, + rw_schema: Schema, pk_indices: Vec, is_append_only: bool, ) -> Result { + let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?; Ok(Self { - schema: schema.clone(), + schema: Arc::new(arrow_schema), + write_path: write_path.to_string(), pk_indices, - writer, + operator, + sink_writer: None, is_append_only, }) } + async fn create_sink_writer(&mut self) -> Result<()> { + let object_store_writer = self + .operator + .writer_with(&self.write_path) + .concurrent(8) + .buffer(SINK_WRITE_BUFFER_SIZE) + .await?; + let parquet_config = ParquetWriterConfig::default(); + let mut props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_bloom_filter_enabled(parquet_config.enable_bloom_filter) + .set_compression(parquet_config.compression) + .set_max_row_group_size(parquet_config.max_row_group_size) + .set_write_batch_size(parquet_config.write_batch_size) + .set_data_page_size_limit(parquet_config.data_page_size); + if let Some(created_by) = parquet_config.created_by.as_ref() { + props = props.set_created_by(created_by.to_string()); + } + self.sink_writer = Some(AsyncArrowWriter::try_new( + object_store_writer, + self.schema.clone(), + SINK_WRITE_BUFFER_SIZE, + Some(props.build()), + )?); + Ok(()) + } + async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { let (mut chunk, ops) = chunk.compact().into_parts(); let filters = chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::(); chunk.set_visibility(filters); - let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); - let batch = to_record_batch_with_schema(Arc::new(arrow_schema), &chunk.compact())?; - self.writer.write(&batch).await?; + + let batch = to_record_batch_with_schema(self.schema.clone(), &chunk.compact())?; + + self.sink_writer + .as_mut() + .ok_or_else(|| SinkError::Opendal("Sink writer is not created.".to_string()))? + .write(&batch) + .await?; Ok(()) } } -fn change_schema_to_arrow_schema( - _schema: risingwave_common::catalog::Schema, -) -> arrow_schema::Schema { - todo!() +fn convert_rw_schema_to_arrow_schema( + rw_schema: risingwave_common::catalog::Schema, +) -> anyhow::Result { + let mut schema_fields = HashMap::new(); + rw_schema.fields.iter().for_each(|field| { + let res = schema_fields.insert(&field.name, &field.data_type); + // This assert is to make sure there is no duplicate field name in the schema. + assert!(res.is_none()) + }); + let mut arrow_fileds = vec![]; + for rw_field in &rw_schema.fields { + let converted_arrow_data_type = + ArrowDataType::try_from(rw_field.data_type.clone()).map_err(|e| anyhow!(e))?; + arrow_fileds.push(ArrowField::new( + rw_field.name.clone(), + converted_arrow_data_type, + false, + )); + } + + Ok(arrow_schema::Schema::new(arrow_fileds)) } diff --git a/src/connector/src/sink/opendal_sink/s3.rs b/src/connector/src/sink/opendal_sink/s3.rs index ec60ef2ab16c8..dc0a19fe92731 100644 --- a/src/connector/src/sink/opendal_sink/s3.rs +++ b/src/connector/src/sink/opendal_sink/s3.rs @@ -12,27 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::collections::HashMap; -use std::sync::Arc; use anyhow::anyhow; use opendal::layers::{LoggingLayer, RetryLayer}; use opendal::services::S3; -use opendal::{Operator, Writer as OpendalWriter}; -use parquet::arrow::async_writer::AsyncArrowWriter; +use opendal::Operator; use risingwave_common::catalog::Schema; use serde::Deserialize; use serde_with::serde_as; use with_options::WithOptions; -use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter}; +use crate::sink::opendal_sink::OpenDalSinkWriter; use crate::sink::writer::{LogSinkerOf, SinkWriterExt}; use crate::sink::{ DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -const S3_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024; - #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct S3Common { #[serde(rename = "s3.region_name")] @@ -163,22 +159,9 @@ impl Sink for S3Sink { ) -> Result { let op = Self::new_s3_sink(self.config.clone())?; let path = self.config.common.path.as_ref(); - let s3_writer = op - .writer_with(path) - .concurrent(8) - .buffer(S3_WRITE_BUFFER_SIZE) - .await?; - - let arrow_schema = change_schema_to_arrow_schema(self.schema.clone()); - let sink_writer: AsyncArrowWriter = AsyncArrowWriter::try_new( - s3_writer, - Arc::new(arrow_schema), - S3_WRITE_BUFFER_SIZE, - None, - )?; - Ok(OpenDalSinkWriter::new( - sink_writer, + op, + path, self.schema.clone(), self.pk_indices.clone(), self.is_append_only,