Skip to content

Commit

Permalink
chore(query): replace arrow2 with arrow-rs in serialize_column โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆand `deserialize_column` (#16655)

* arrow rs ipc

Signed-off-by: coldWater <[email protected]>

* test

Signed-off-by: coldWater <[email protected]>

* log

Signed-off-by: coldWater <[email protected]>

---------

Signed-off-by: coldWater <[email protected]>
  • Loading branch information
forsaken628 authored Oct 22, 2024
1 parent c0b1ae2 commit 0dffb1e
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 26 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ test = true
[dependencies]
arrow-array = { workspace = true }
arrow-flight = { workspace = true }
arrow-ipc = { workspace = true, features = ["lz4"] }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
async-backtrace = { workspace = true }
Expand Down
44 changes: 26 additions & 18 deletions src/query/expression/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ use std::io::Cursor;
use std::io::Read;
use std::io::Seek;
use std::io::Write;

use databend_common_arrow::arrow;
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_ipc::reader::FileReader;
use arrow_ipc::writer::FileWriter;
use arrow_ipc::writer::IpcWriteOptions;
use arrow_ipc::CompressionType;
use arrow_schema::Schema;
use databend_common_arrow::arrow::array::Array;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::bitmap::MutableBitmap;
use databend_common_arrow::arrow::buffer::Buffer;
use databend_common_arrow::arrow::datatypes::Schema;
use databend_common_arrow::arrow::io::ipc::read::read_file_metadata;
use databend_common_arrow::arrow::io::ipc::read::FileReader;
use databend_common_arrow::arrow::io::ipc::write::Compression;
use databend_common_arrow::arrow::io::ipc::write::FileWriter;
use databend_common_arrow::arrow::io::ipc::write::WriteOptions;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

Expand Down Expand Up @@ -75,13 +76,21 @@ pub fn serialize_column(col: &Column) -> Vec<u8> {
buffer
}

pub fn write_column(col: &Column, w: &mut impl Write) -> arrow::error::Result<()> {
let schema = Schema::from(vec![col.arrow_field()]);
let mut writer = FileWriter::new(w, schema, None, WriteOptions {
compression: Some(Compression::LZ4),
});
writer.start()?;
writer.write(&arrow::chunk::Chunk::new(vec![col.as_arrow()]), None)?;
pub fn write_column(
col: &Column,
w: &mut impl Write,
) -> std::result::Result<(), arrow_schema::ArrowError> {
let field: arrow_schema::Field = col.arrow_field().into();
let schema = Schema::new(vec![field]);
let mut writer = FileWriter::try_new_with_options(
w,
&schema,
IpcWriteOptions::default().try_with_compression(Some(CompressionType::LZ4_FRAME))?,
)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![col.clone().into_arrow_rs()])?;

writer.write(&batch)?;
writer.finish()
}

Expand All @@ -95,14 +104,13 @@ pub fn read_column<R: Read + Seek>(r: &mut R) -> Result<Column> {
let f = metadata.schema.fields[0].clone();
let data_field = DataField::try_from(&f)?;

let mut reader = FileReader::new(r, metadata, None, None);
let mut reader = FileReader::try_new(r, None)?;
let col = reader
.next()
.ok_or_else(|| ErrorCode::Internal("expected one arrow array"))??
.into_arrays()
.remove(0);
.remove_column(0);

Column::from_arrow(col.as_ref(), data_field.data_type())
Column::from_arrow_rs(col, data_field.data_type())
}

/// Convert a column to a arrow array.
Expand Down
31 changes: 28 additions & 3 deletions src/query/expression/tests/it/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,39 @@ fn test_serde_bin_column() -> Result<()> {
StringType::from_data(vec!["SM CASE", "axx", "bxx", "xxe", "eef", "fg"]),
];

for col in columns {
let data = serialize_column(&col);
for col in &columns {
let data = serialize_column(col);
let t = deserialize_column(&data).unwrap();
assert_eq!(col, t);
assert_eq!(col, &t);
}

for col in &columns {
let data = serialize_column_old(col);
let t = deserialize_column(&data).unwrap();
assert_eq!(col, &t);
}
Ok(())
}

fn serialize_column_old(col: &Column) -> Vec<u8> {
use databend_common_arrow::arrow::chunk::Chunk;
use databend_common_arrow::arrow::datatypes::Schema;
use databend_common_arrow::arrow::io::ipc::write::FileWriter;
use databend_common_arrow::arrow::io::ipc::write::WriteOptions;

let mut buffer = Vec::new();

let schema = Schema::from(vec![col.arrow_field()]);
let mut writer = FileWriter::new(&mut buffer, schema, None, WriteOptions::default());
writer.start().unwrap();
writer
.write(&Chunk::new(vec![col.as_arrow()]), None)
.unwrap();
writer.finish().unwrap();

buffer
}

#[test]
fn test_borsh_serde_column() -> Result<()> {
#[derive(BorshSerialize, BorshDeserialize, Eq, PartialEq, Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,9 @@ pub fn spilling_aggregate_payload<Method: HashMethodBounds>(
}

info!(
"Write aggregate spill {} successfully, elapsed: {:?}",
"Write aggregate spill {} successfully, size: {:?} ,elapsed: {:?}",
location,
write_bytes,
instant.elapsed()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,12 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Processor
}
};

info!(
"Read {} aggregate spills successfully, total elapsed: {:?}",
processed_count, total_elapsed
);
if processed_count != 0 {
info!(
"Read {} aggregate spills successfully, total elapsed: {:?}",
processed_count, total_elapsed
);
}
}
}
}
Expand Down

0 comments on commit 0dffb1e

Please sign in to comment.