Skip to content

Commit

Permalink
add parquet writer, todo: add e2e test and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu committed Mar 14, 2024
1 parent 08d05f0 commit f4618c1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 61 deletions.
23 changes: 5 additions & 18 deletions src/connector/src/sink/opendal_sink/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Gcs;
use opendal::{Operator, Writer as OpendalWriter};
use parquet::arrow::async_writer::AsyncArrowWriter;
use opendal::Operator;
use risingwave_common::catalog::Schema;
use serde::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter};
use crate::sink::opendal_sink::OpenDalSinkWriter;
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY,
Expand Down Expand Up @@ -148,21 +146,10 @@ impl Sink for GcsSink {
) -> Result<Self::LogSinker> {
let op = Self::new_gcs_sink(self.config.clone())?;
let path = self.config.common.path.as_ref();
let gcs_writer = op
.writer_with(path)
.concurrent(8)
.buffer(GCS_WRITE_BUFFER_SIZE)
.await?;

let arrow_schema = change_schema_to_arrow_schema(self.schema.clone());
let sink_writer: AsyncArrowWriter<OpendalWriter> = AsyncArrowWriter::try_new(
gcs_writer,
Arc::new(arrow_schema),
GCS_WRITE_BUFFER_SIZE,
None,
)?;

Ok(OpenDalSinkWriter::new(
sink_writer,
op,
path,
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
Expand Down
107 changes: 85 additions & 22 deletions src/connector/src/sink/opendal_sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,39 @@

pub mod gcs;
pub mod s3;

use std::collections::HashMap;
use std::sync::Arc;


use anyhow::anyhow;
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, SchemaRef};
use async_trait::async_trait;
use opendal::Writer as OpendalWriter;
use parquet::arrow::async_writer::AsyncArrowWriter;
use icelake::config::ParquetWriterConfig;
use opendal::{Operator, Writer as OpendalWriter};
use parquet::arrow::AsyncArrowWriter;
use parquet::file::properties::{WriterProperties, WriterVersion};
use risingwave_common::array::{to_record_batch_with_schema, Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;

use crate::sink::{Result, SinkError, SinkWriter};

use crate::sink::encoder::{RowEncoder};
use crate::sink::{Result, SinkWriter};

pub const GCS_SINK: &str = "gcs";
const SINK_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024;

pub struct OpenDalSinkWriter {
schema: Schema,
writer: AsyncArrowWriter<OpendalWriter>,
schema: SchemaRef,
operator: Operator,
sink_writer: Option<AsyncArrowWriter<OpendalWriter>>,
pk_indices: Vec<usize>,
is_append_only: bool,
write_path: String,
}

#[async_trait]
impl SinkWriter for OpenDalSinkWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
if self.sink_writer.is_none() {
self.create_sink_writer().await?;
}
if self.is_append_only {
self.append_only(chunk).await
} else {
Expand All @@ -58,7 +64,11 @@ impl SinkWriter for OpenDalSinkWriter {

async fn barrier(&mut self, is_checkpoint: bool) -> Result<()> {
if is_checkpoint {
todo!()
let sink_writer = self
.sink_writer
.take()
.ok_or_else(|| SinkError::Opendal("Can't get sink writer".to_string()))?;
sink_writer.close().await?;
}

Ok(())
Expand All @@ -71,34 +81,87 @@ impl SinkWriter for OpenDalSinkWriter {

impl OpenDalSinkWriter {
pub fn new(
writer: AsyncArrowWriter<OpendalWriter>,
schema: Schema,
operator: Operator,
write_path: &str,
rw_schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Result<Self> {
let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema)?;
Ok(Self {
schema: schema.clone(),
schema: Arc::new(arrow_schema),
write_path: write_path.to_string(),
pk_indices,
writer,
operator,
sink_writer: None,
is_append_only,
})
}

async fn create_sink_writer(&mut self) -> Result<()> {
let object_store_writer = self
.operator
.writer_with(&self.write_path)
.concurrent(8)
.buffer(SINK_WRITE_BUFFER_SIZE)
.await?;
let parquet_config = ParquetWriterConfig::default();
let mut props = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_1_0)
.set_bloom_filter_enabled(parquet_config.enable_bloom_filter)
.set_compression(parquet_config.compression)
.set_max_row_group_size(parquet_config.max_row_group_size)
.set_write_batch_size(parquet_config.write_batch_size)
.set_data_page_size_limit(parquet_config.data_page_size);
if let Some(created_by) = parquet_config.created_by.as_ref() {
props = props.set_created_by(created_by.to_string());
}
self.sink_writer = Some(AsyncArrowWriter::try_new(
object_store_writer,
self.schema.clone(),
SINK_WRITE_BUFFER_SIZE,
Some(props.build()),
)?);
Ok(())
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
let (mut chunk, ops) = chunk.compact().into_parts();
let filters =
chunk.visibility() & ops.iter().map(|op| *op == Op::Insert).collect::<Bitmap>();
chunk.set_visibility(filters);
let arrow_schema = change_schema_to_arrow_schema(self.schema.clone());
let batch = to_record_batch_with_schema(Arc::new(arrow_schema), &chunk.compact())?;
self.writer.write(&batch).await?;

let batch = to_record_batch_with_schema(self.schema.clone(), &chunk.compact())?;

self.sink_writer
.as_mut()
.ok_or_else(|| SinkError::Opendal("Sink writer is not created.".to_string()))?
.write(&batch)
.await?;

Ok(())
}
}

fn change_schema_to_arrow_schema(
_schema: risingwave_common::catalog::Schema,
) -> arrow_schema::Schema {
todo!()
fn convert_rw_schema_to_arrow_schema(
rw_schema: risingwave_common::catalog::Schema,
) -> anyhow::Result<arrow_schema::Schema> {
let mut schema_fields = HashMap::new();
rw_schema.fields.iter().for_each(|field| {
let res = schema_fields.insert(&field.name, &field.data_type);
// This assert is to make sure there is no duplicate field name in the schema.
assert!(res.is_none())
});
let mut arrow_fileds = vec![];
for rw_field in &rw_schema.fields {
let converted_arrow_data_type =
ArrowDataType::try_from(rw_field.data_type.clone()).map_err(|e| anyhow!(e))?;
arrow_fileds.push(ArrowField::new(
rw_field.name.clone(),
converted_arrow_data_type,
false,
));
}

Ok(arrow_schema::Schema::new(arrow_fileds))
}
25 changes: 4 additions & 21 deletions src/connector/src/sink/opendal_sink/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::S3;
use opendal::{Operator, Writer as OpendalWriter};
use parquet::arrow::async_writer::AsyncArrowWriter;
use opendal::Operator;
use risingwave_common::catalog::Schema;
use serde::Deserialize;
use serde_with::serde_as;
use with_options::WithOptions;

use crate::sink::opendal_sink::{change_schema_to_arrow_schema, OpenDalSinkWriter};
use crate::sink::opendal_sink::OpenDalSinkWriter;
use crate::sink::writer::{LogSinkerOf, SinkWriterExt};
use crate::sink::{
DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY,
SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

const S3_WRITE_BUFFER_SIZE: usize = 16 * 1024 * 1024;

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct S3Common {
#[serde(rename = "s3.region_name")]
Expand Down Expand Up @@ -163,22 +159,9 @@ impl Sink for S3Sink {
) -> Result<Self::LogSinker> {
let op = Self::new_s3_sink(self.config.clone())?;
let path = self.config.common.path.as_ref();
let s3_writer = op
.writer_with(path)
.concurrent(8)
.buffer(S3_WRITE_BUFFER_SIZE)
.await?;

let arrow_schema = change_schema_to_arrow_schema(self.schema.clone());
let sink_writer: AsyncArrowWriter<OpendalWriter> = AsyncArrowWriter::try_new(
s3_writer,
Arc::new(arrow_schema),
S3_WRITE_BUFFER_SIZE,
None,
)?;

Ok(OpenDalSinkWriter::new(
sink_writer,
op,
path,
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
Expand Down

0 comments on commit f4618c1

Please sign in to comment.