Skip to content

Commit

Permalink
fix(query): Add LegacyScalar & LegacyColumn (#14264)
Browse files Browse the repository at this point in the history
* new test case test_msg_pack_enum_reorder

* fix-279

* fix(query): add test_bincode_backward_compat_scalar

* fix(query): add test_bincode_backward_compat_scalar

* fix(query): add conversion between LegacyScalar and Scalar

* fix(query): add conversion between LegacyScalar and Scalar

* fix(query): add conversion between LegacyScalar and Scalar

* fix(query): add conversion between LegacyScalar and Scalar

* fix(query): add conversion between LegacyScalar and Scalar

* fix(query): add conversion between LegacyScalar and Scalar

* fix(query): add conversion between LegacyScalar and Scalar

---------

Co-authored-by: dantengsky <[email protected]>
  • Loading branch information
sundy-li and dantengsky authored Jan 8, 2024
1 parent c45e575 commit 5fc7767
Show file tree
Hide file tree
Showing 8 changed files with 380 additions and 6 deletions.
51 changes: 51 additions & 0 deletions src/common/io/tests/it/bincode_serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,54 @@ fn test_serialize_standard_deserialize_legacy() {
bincode_deserialize_from_slice_with_config(slice, BincodeConfig::Legacy);
assert!(result.is_err());
}

#[test]
fn test_serialize_bincode_enum() {
#[derive(Serialize, Deserialize, PartialEq, Debug)]
enum Scalar {
Null,
Int(i8),
String(Vec<u8>),
Float(f32),
}

#[derive(Serialize, Deserialize, PartialEq, Debug)]
enum Scalar2 {
Null,
Int(i8),
String(Vec<u8>),
Float(f32),
Binary(Vec<u8>),
}

#[derive(Serialize, Deserialize, PartialEq, Debug)]
enum Scalar3 {
Null,
Int(i8),
Binary(Vec<u8>),
String(Vec<u8>),
Float(f32),
}

let value = vec![
Scalar::Null,
Scalar::Float(2.3f32),
Scalar::Float(3.2f32),
Scalar::String(vec![1, 2, 3]),
];
let mut buffer = Cursor::new(Vec::new());
bincode_serialize_into_buf_with_config(&mut buffer, &value, BincodeConfig::Standard).unwrap();
let slice = buffer.get_ref().as_slice();

let deserialized: Vec<Scalar> =
bincode_deserialize_from_slice_with_config(slice, BincodeConfig::Standard).unwrap();

let deserialized2: Vec<Scalar2> =
bincode_deserialize_from_slice_with_config(slice, BincodeConfig::Standard).unwrap();

assert!(format!("{:?}", deserialized) == format!("{:?}", deserialized2));

let deserialized3: Result<Vec<Scalar3>> =
bincode_deserialize_from_slice_with_config(slice, BincodeConfig::Standard);
assert!(deserialized3.is_err());
}
218 changes: 218 additions & 0 deletions src/query/expression/src/converts/bincode/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// DO NOT EDIT.
// This crate keeps some legacy codes for compatibility

use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::buffer::Buffer;
use databend_common_exception::Result;
use enum_as_inner::EnumAsInner;
use serde::Deserialize;
use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;

use crate::types::array::ArrayColumn;
use crate::types::decimal::DecimalColumn;
use crate::types::decimal::DecimalScalar;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::number::NumberScalar;
use crate::types::string::StringColumn;
use crate::types::*;
use crate::Column;
use crate::Scalar;

#[derive(Clone, Serialize, Deserialize)]
pub enum LegacyScalar {
Null,
EmptyArray,
EmptyMap,
Number(NumberScalar),
Decimal(DecimalScalar),
Timestamp(i64),
Date(i32),
Boolean(bool),
String(Vec<u8>),
Array(LegacyColumn),
Map(LegacyColumn),
Bitmap(Vec<u8>),
Tuple(Vec<Scalar>),
Variant(Vec<u8>),
}

#[derive(Clone, EnumAsInner)]
pub enum LegacyColumn {
Null { len: usize },
EmptyArray { len: usize },
EmptyMap { len: usize },
Number(NumberColumn),
Decimal(DecimalColumn),
Boolean(Bitmap),
String(StringColumn),
Timestamp(Buffer<i64>),
Date(Buffer<i32>),
Array(Box<LegacyArrayColumn>),
Map(Box<LegacyArrayColumn>),
Bitmap(StringColumn),
Nullable(Box<LegacyNullableColumn>),
Tuple(Vec<LegacyColumn>),
Variant(StringColumn),
}

#[derive(Clone)]
pub struct LegacyArrayColumn {
pub values: LegacyColumn,
pub offsets: Buffer<u64>,
}

#[derive(Clone)]
pub struct LegacyNullableColumn {
pub column: LegacyColumn,
pub validity: Bitmap,
}

impl From<LegacyScalar> for Scalar {
fn from(value: LegacyScalar) -> Self {
match value {
LegacyScalar::Null => Scalar::Null,
LegacyScalar::EmptyArray => Scalar::EmptyArray,
LegacyScalar::EmptyMap => Scalar::EmptyMap,
LegacyScalar::Number(num_scalar) => Scalar::Number(num_scalar),
LegacyScalar::Decimal(dec_scalar) => Scalar::Decimal(dec_scalar),
LegacyScalar::Timestamp(ts) => Scalar::Timestamp(ts),
LegacyScalar::Date(date) => Scalar::Date(date),
LegacyScalar::Boolean(b) => Scalar::Boolean(b),
LegacyScalar::String(s) => Scalar::String(s),
LegacyScalar::Array(col) => Scalar::Array(col.into()),
LegacyScalar::Map(col) => Scalar::Map(col.into()),
LegacyScalar::Bitmap(bmp) => Scalar::Bitmap(bmp),
LegacyScalar::Tuple(tuple) => Scalar::Tuple(tuple),
LegacyScalar::Variant(variant) => Scalar::Variant(variant),
}
}
}

impl From<LegacyColumn> for Column {
fn from(value: LegacyColumn) -> Self {
match value {
LegacyColumn::Null { len } => Column::Null { len },
LegacyColumn::EmptyArray { len } => Column::EmptyArray { len },
LegacyColumn::EmptyMap { len } => Column::EmptyMap { len },
LegacyColumn::Number(num_col) => Column::Number(num_col),
LegacyColumn::Decimal(dec_col) => Column::Decimal(dec_col),
LegacyColumn::Boolean(bmp) => Column::Boolean(bmp),
LegacyColumn::String(str_col) => Column::String(str_col),
LegacyColumn::Timestamp(buf) => Column::Timestamp(buf),
LegacyColumn::Date(buf) => Column::Date(buf),
LegacyColumn::Array(arr_col) => Column::Array(Box::new(ArrayColumn::<AnyType> {
values: arr_col.values.into(),
offsets: arr_col.offsets,
})),
LegacyColumn::Map(map_col) => Column::Map(Box::new(ArrayColumn::<AnyType> {
values: map_col.values.into(),
offsets: map_col.offsets,
})),
LegacyColumn::Bitmap(str_col) => Column::Bitmap(str_col),
LegacyColumn::Nullable(nullable_col) => {
Column::Nullable(Box::new(NullableColumn::<AnyType> {
column: nullable_col.column.into(),
validity: nullable_col.validity,
}))
}
LegacyColumn::Tuple(tuple) => {
Column::Tuple(tuple.into_iter().map(|c| c.into()).collect())
}
LegacyColumn::Variant(variant) => Column::Variant(variant),
}
}
}

impl From<Scalar> for LegacyScalar {
fn from(value: Scalar) -> Self {
match value {
Scalar::Null => LegacyScalar::Null,
Scalar::EmptyArray => LegacyScalar::EmptyArray,
Scalar::EmptyMap => LegacyScalar::EmptyMap,
Scalar::Number(num_scalar) => LegacyScalar::Number(num_scalar),
Scalar::Decimal(dec_scalar) => LegacyScalar::Decimal(dec_scalar),
Scalar::Timestamp(ts) => LegacyScalar::Timestamp(ts),
Scalar::Date(date) => LegacyScalar::Date(date),
Scalar::Boolean(b) => LegacyScalar::Boolean(b),
Scalar::Binary(_) => unreachable!(),
Scalar::String(string) => LegacyScalar::String(string),
Scalar::Array(column) => LegacyScalar::Array(column.into()),
Scalar::Map(column) => LegacyScalar::Map(column.into()),
Scalar::Bitmap(bitmap) => LegacyScalar::Bitmap(bitmap),
Scalar::Tuple(tuple) => LegacyScalar::Tuple(tuple),
Scalar::Variant(variant) => LegacyScalar::Variant(variant),
}
}
}

impl From<Column> for LegacyColumn {
fn from(value: Column) -> Self {
match value {
Column::Null { len } => LegacyColumn::Null { len },
Column::EmptyArray { len } => LegacyColumn::EmptyArray { len },
Column::EmptyMap { len } => LegacyColumn::EmptyMap { len },
Column::Number(num_col) => LegacyColumn::Number(num_col),
Column::Decimal(dec_col) => LegacyColumn::Decimal(dec_col),
Column::Boolean(bmp) => LegacyColumn::Boolean(bmp),
Column::Binary(_) => unreachable!(),
Column::String(str_col) => LegacyColumn::String(str_col),
Column::Timestamp(buf) => LegacyColumn::Timestamp(buf),
Column::Date(buf) => LegacyColumn::Date(buf),
Column::Array(arr_col) => LegacyColumn::Array(Box::new(LegacyArrayColumn {
values: arr_col.values.into(),
offsets: arr_col.offsets,
})),
Column::Map(map_col) => LegacyColumn::Map(Box::new(LegacyArrayColumn {
values: map_col.values.into(),
offsets: map_col.offsets,
})),
Column::Bitmap(str_col) => LegacyColumn::Bitmap(str_col),
Column::Nullable(nullable_col) => {
LegacyColumn::Nullable(Box::new(LegacyNullableColumn {
column: nullable_col.column.into(),
validity: nullable_col.validity,
}))
}
Column::Tuple(tuple) => {
LegacyColumn::Tuple(tuple.into_iter().map(|c| c.into()).collect())
}
Column::Variant(variant) => LegacyColumn::Variant(variant),
}
}
}

// Serialize a column to a base64 string.
// Because we may use serde::json/bincode to serialize the column, so we wrap it into string
impl Serialize for LegacyColumn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
let c: Column = self.clone().into();

Serialize::serialize(&c, serializer)
}
}

impl<'de> Deserialize<'de> for LegacyColumn {
fn deserialize<D>(deserializer: D) -> Result<LegacyColumn, D::Error>
where D: Deserializer<'de> {
let c: Column = Deserialize::deserialize(deserializer)?;
Ok(c.into())
}
}
1 change: 1 addition & 0 deletions src/query/expression/src/converts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@

pub mod arrow;
pub mod arrow2;
pub mod bincode;
pub mod datavalues;
71 changes: 71 additions & 0 deletions src/query/expression/tests/it/bincode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_expression::converts::bincode::LegacyColumn;
use databend_common_expression::converts::bincode::LegacyScalar;
use databend_common_expression::Column;
use databend_common_expression::Scalar;
use databend_common_io::prelude::bincode_deserialize_from_slice;
use databend_common_io::prelude::bincode_serialize_into_buf;

use crate::rand_block_for_all_types;

/// This test covers scatter.rs.
#[test]
pub fn test_legacy_converts() -> databend_common_exception::Result<()> {
use rand::Rng;

let mut rng = rand::thread_rng();
let test_times = rng.gen_range(5..30);

for _ in 0..test_times {
let rows = rng.gen_range(100..1024);
let random_block = rand_block_for_all_types(rows);
for entry in random_block
.columns()
.iter()
.filter(|c| !c.data_type.remove_nullable().is_binary())
{
let column = entry.value.as_column().unwrap().clone();

let legacy_column: LegacyColumn = column.clone().into();
let convert_back_column: Column = legacy_column.into();
assert_eq!(column, convert_back_column);

let mut v3_scalars = vec![];

for row in 0..rows {
let scalar = entry.value.index(row).unwrap().to_owned();
let legacy_scalar: LegacyScalar = scalar.clone().into();
v3_scalars.push(legacy_scalar.clone());

let convert_back_scalar: Scalar = legacy_scalar.into();
assert_eq!(scalar, convert_back_scalar);
}

let mut data = vec![];
bincode_serialize_into_buf(&mut data, &v3_scalars).unwrap();
let new_scalars: Vec<LegacyScalar> = bincode_deserialize_from_slice(&data).unwrap();

for (a, b) in v3_scalars.into_iter().zip(new_scalars.into_iter()) {
let a: Scalar = a.into();
let b: Scalar = b.into();

assert_eq!(a, b);
}
}
}

Ok(())
}
1 change: 1 addition & 0 deletions src/query/expression/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_expression::DataBlock;

extern crate core;

mod bincode;
mod block;
mod column;
mod common;
Expand Down
1 change: 1 addition & 0 deletions src/query/service/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#![feature(thread_local)]
#![feature(int_roundings)]
#![allow(clippy::diverging_sub_expression)]
#![feature(assert_matches)]

extern crate core;

Expand Down
Loading

0 comments on commit 5fc7767

Please sign in to comment.