From 0dffb1eb8d1c0b9ad436bcdc7ca01d262e8dfc97 Mon Sep 17 00:00:00 2001 From: coldWater Date: Tue, 22 Oct 2024 16:17:22 +0800 Subject: [PATCH] chore(query): replace `arrow2` with `arrow-rs` in `serialize_column` and `deserialize_column` (#16655) * arrow rs ipc Signed-off-by: coldWater * test Signed-off-by: coldWater * log Signed-off-by: coldWater --------- Signed-off-by: coldWater --- Cargo.lock | 2 + src/query/expression/Cargo.toml | 1 + src/query/expression/src/utils/arrow.rs | 44 +++++++++++-------- src/query/expression/tests/it/serde.rs | 31 +++++++++++-- .../serde/transform_aggregate_spill_writer.rs | 3 +- .../serde/transform_spill_reader.rs | 10 +++-- 6 files changed, 65 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3baf5dd93ee..aca2915dc516 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -455,6 +455,7 @@ dependencies = [ "arrow-data", "arrow-schema", "flatbuffers", + "lz4_flex", ] [[package]] @@ -3443,6 +3444,7 @@ version = "0.1.0" dependencies = [ "arrow-array", "arrow-flight", + "arrow-ipc", "arrow-ord", "arrow-schema", "arrow-select", diff --git a/src/query/expression/Cargo.toml b/src/query/expression/Cargo.toml index 0b2701b18095..08d4232286c4 100644 --- a/src/query/expression/Cargo.toml +++ b/src/query/expression/Cargo.toml @@ -12,6 +12,7 @@ test = true [dependencies] arrow-array = { workspace = true } arrow-flight = { workspace = true } +arrow-ipc = { workspace = true, features = ["lz4"] } arrow-schema = { workspace = true } arrow-select = { workspace = true } async-backtrace = { workspace = true } diff --git a/src/query/expression/src/utils/arrow.rs b/src/query/expression/src/utils/arrow.rs index 7d3f20dc0127..3c7b7fbd75c4 100644 --- a/src/query/expression/src/utils/arrow.rs +++ b/src/query/expression/src/utils/arrow.rs @@ -16,18 +16,19 @@ use std::io::Cursor; use std::io::Read; use std::io::Seek; use std::io::Write; - -use databend_common_arrow::arrow; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_ipc::reader::FileReader; +use arrow_ipc::writer::FileWriter; +use arrow_ipc::writer::IpcWriteOptions; +use arrow_ipc::CompressionType; +use arrow_schema::Schema; use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_arrow::arrow::buffer::Buffer; -use databend_common_arrow::arrow::datatypes::Schema; use databend_common_arrow::arrow::io::ipc::read::read_file_metadata; -use databend_common_arrow::arrow::io::ipc::read::FileReader; -use databend_common_arrow::arrow::io::ipc::write::Compression; -use databend_common_arrow::arrow::io::ipc::write::FileWriter; -use databend_common_arrow::arrow::io::ipc::write::WriteOptions; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -75,13 +76,21 @@ pub fn serialize_column(col: &Column) -> Vec { buffer } -pub fn write_column(col: &Column, w: &mut impl Write) -> arrow::error::Result<()> { - let schema = Schema::from(vec![col.arrow_field()]); - let mut writer = FileWriter::new(w, schema, None, WriteOptions { - compression: Some(Compression::LZ4), - }); - writer.start()?; - writer.write(&arrow::chunk::Chunk::new(vec![col.as_arrow()]), None)?; +pub fn write_column( + col: &Column, + w: &mut impl Write, +) -> std::result::Result<(), arrow_schema::ArrowError> { + let field: arrow_schema::Field = col.arrow_field().into(); + let schema = Schema::new(vec![field]); + let mut writer = FileWriter::try_new_with_options( + w, + &schema, + IpcWriteOptions::default().try_with_compression(Some(CompressionType::LZ4_FRAME))?, + )?; + + let batch = RecordBatch::try_new(Arc::new(schema), vec![col.clone().into_arrow_rs()])?; + + writer.write(&batch)?; writer.finish() } @@ -95,14 +104,13 @@ pub fn read_column(r: &mut R) -> Result { let f = metadata.schema.fields[0].clone(); let data_field = DataField::try_from(&f)?; - let mut reader = FileReader::new(r, metadata, None, None); + let mut reader = FileReader::try_new(r, None)?; let col = reader .next() .ok_or_else(|| ErrorCode::Internal("expected one arrow array"))?? - .into_arrays() - .remove(0); + .remove_column(0); - Column::from_arrow(col.as_ref(), data_field.data_type()) + Column::from_arrow_rs(col, data_field.data_type()) } /// Convert a column to a arrow array. diff --git a/src/query/expression/tests/it/serde.rs b/src/query/expression/tests/it/serde.rs index eb61115b25e6..f07bae3157da 100644 --- a/src/query/expression/tests/it/serde.rs +++ b/src/query/expression/tests/it/serde.rs @@ -79,14 +79,39 @@ fn test_serde_bin_column() -> Result<()> { StringType::from_data(vec!["SM CASE", "axx", "bxx", "xxe", "eef", "fg"]), ]; - for col in columns { - let data = serialize_column(&col); + for col in &columns { + let data = serialize_column(col); let t = deserialize_column(&data).unwrap(); - assert_eq!(col, t); + assert_eq!(col, &t); + } + + for col in &columns { + let data = serialize_column_old(col); + let t = deserialize_column(&data).unwrap(); + assert_eq!(col, &t); } Ok(()) } +fn serialize_column_old(col: &Column) -> Vec { + use databend_common_arrow::arrow::chunk::Chunk; + use databend_common_arrow::arrow::datatypes::Schema; + use databend_common_arrow::arrow::io::ipc::write::FileWriter; + use databend_common_arrow::arrow::io::ipc::write::WriteOptions; + + let mut buffer = Vec::new(); + + let schema = Schema::from(vec![col.arrow_field()]); + let mut writer = FileWriter::new(&mut buffer, schema, None, WriteOptions::default()); + writer.start().unwrap(); + writer + .write(&Chunk::new(vec![col.as_arrow()]), None) + .unwrap(); + writer.finish().unwrap(); + + buffer +} + #[test] fn test_borsh_serde_column() -> Result<()> { #[derive(BorshSerialize, BorshDeserialize, Eq, PartialEq, Debug)] diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs index 3b3d56586b54..ab721706d7d2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs @@ -389,8 +389,9 @@ pub fn spilling_aggregate_payload( } info!( - "Write aggregate spill {} successfully, elapsed: {:?}", + "Write aggregate spill {} successfully, size: {:?} ,elapsed: {:?}", location, + write_bytes, instant.elapsed() ); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs index f625de75db9b..2d1ffd3a0a01 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_spill_reader.rs @@ -270,10 +270,12 @@ impl Processor } }; - info!( - "Read {} aggregate spills successfully, total elapsed: {:?}", - processed_count, total_elapsed - ); + if processed_count != 0 { + info!( + "Read {} aggregate spills successfully, total elapsed: {:?}", + processed_count, total_elapsed + ); + } } } }