diff --git a/Cargo.lock b/Cargo.lock
index 11fccb96d43a..60828b081b09 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3017,6 +3017,7 @@ dependencies = [
"log",
"opendal",
"parquet",
+ "rand 0.8.5",
"serde",
"storages-common-index",
"storages-common-pruner",
diff --git a/src/query/functions/src/srfs/mod.rs b/src/query/functions/src/srfs/mod.rs
index 26c58664eef1..41e14ad55907 100644
--- a/src/query/functions/src/srfs/mod.rs
+++ b/src/query/functions/src/srfs/mod.rs
@@ -12,13 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use common_expression::FunctionRegistry;
-
mod array;
mod variant;
-pub use variant::FlattenGenerator;
-pub use variant::FlattenMode;
+use common_expression::FunctionRegistry;
pub fn register(registry: &mut FunctionRegistry) {
array::register(registry);
diff --git a/src/query/functions/src/srfs/variant.rs b/src/query/functions/src/srfs/variant.rs
index eee807616b92..3720f70eebf2 100644
--- a/src/query/functions/src/srfs/variant.rs
+++ b/src/query/functions/src/srfs/variant.rs
@@ -14,12 +14,15 @@
use std::sync::Arc;
+use common_expression::types::nullable::NullableColumnBuilder;
use common_expression::types::string::StringColumnBuilder;
use common_expression::types::AnyType;
use common_expression::types::DataType;
+use common_expression::types::NullableType;
use common_expression::types::NumberDataType;
use common_expression::types::StringType;
use common_expression::types::UInt64Type;
+use common_expression::types::ValueType;
use common_expression::types::VariantType;
use common_expression::Column;
use common_expression::FromData;
@@ -206,18 +209,20 @@ pub fn register(registry: &mut FunctionRegistry) {
signature: FunctionSignature {
name: "json_each".to_string(),
args_type: args_type.to_vec(),
- return_type: DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Tuple(
- vec![DataType::String, DataType::Variant],
- )))]),
+ return_type: DataType::Tuple(vec![
+ DataType::Nullable(Box::new(DataType::String)),
+ DataType::Nullable(Box::new(DataType::Variant)),
+ ]),
},
eval: FunctionEval::SRF {
eval: Box::new(|args, ctx, max_nums_per_row| {
let arg = args[0].clone().to_owned();
(0..ctx.num_rows)
.map(|row| match arg.index(row).unwrap() {
- ScalarRef::Null => {
- (Value::Scalar(Scalar::Tuple(vec![Scalar::Null])), 0)
- }
+ ScalarRef::Null => (
+ Value::Scalar(Scalar::Tuple(vec![Scalar::Null, Scalar::Null])),
+ 0,
+ ),
ScalarRef::Variant(val) => {
unnest_variant_obj(val, row, max_nums_per_row)
}
@@ -233,7 +238,7 @@ pub fn register(registry: &mut FunctionRegistry) {
"flatten".to_string(),
FunctionProperty::default().kind(FunctionKind::SRF),
);
- registry.register_function_factory("flatten", |_, args_type| {
+ registry.register_function_factory("flatten", |params, args_type| {
if args_type.is_empty() || args_type.len() > 5 {
return None;
}
@@ -264,24 +269,23 @@ pub fn register(registry: &mut FunctionRegistry) {
{
return None;
}
+ let params = params.to_vec();
Some(Arc::new(Function {
signature: FunctionSignature {
name: "flatten".to_string(),
args_type: args_type.to_vec(),
- return_type: DataType::Tuple(vec![DataType::Nullable(Box::new(DataType::Tuple(
- vec![
- DataType::Number(NumberDataType::UInt64),
- DataType::Nullable(Box::new(DataType::String)),
- DataType::Nullable(Box::new(DataType::String)),
- DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
- DataType::Nullable(Box::new(DataType::Variant)),
- DataType::Nullable(Box::new(DataType::Variant)),
- ],
- )))]),
+ return_type: DataType::Tuple(vec![
+ DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
+ DataType::Nullable(Box::new(DataType::String)),
+ DataType::Nullable(Box::new(DataType::String)),
+ DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))),
+ DataType::Nullable(Box::new(DataType::Variant)),
+ DataType::Nullable(Box::new(DataType::Variant)),
+ ]),
},
eval: FunctionEval::SRF {
- eval: Box::new(|args, ctx, max_nums_per_row| {
+ eval: Box::new(move |args, ctx, max_nums_per_row| {
let arg = args[0].clone().to_owned();
let mut json_path = None;
@@ -401,7 +405,17 @@ pub fn register(registry: &mut FunctionRegistry) {
{
match arg.index(row).unwrap() {
ScalarRef::Null => {
- results.push((Value::Scalar(Scalar::Tuple(vec![Scalar::Null])), 0));
+ results.push((
+ Value::Scalar(Scalar::Tuple(vec![
+ Scalar::Null,
+ Scalar::Null,
+ Scalar::Null,
+ Scalar::Null,
+ Scalar::Null,
+ Scalar::Null,
+ ])),
+ 0,
+ ));
}
ScalarRef::Variant(val) => {
let columns = match json_path {
@@ -414,15 +428,19 @@ pub fn register(registry: &mut FunctionRegistry) {
&mut builder.offsets,
);
let inner_val = builder.pop().unwrap_or_default();
- generator.generate((row + 1) as u64, &inner_val, path)
+ generator.generate(
+ (row + 1) as u64,
+ &inner_val,
+ path,
+ ¶ms,
+ )
}
- None => generator.generate((row + 1) as u64, val, ""),
+ None => generator.generate((row + 1) as u64, val, "", ¶ms),
};
let len = columns[0].len();
*max_nums_per_row = std::cmp::max(*max_nums_per_row, len);
- let inner_col = Column::Tuple(columns).wrap_nullable(None);
- results.push((Value::Column(Column::Tuple(vec![inner_col])), len));
+ results.push((Value::Column(Column::Tuple(columns)), len));
}
_ => unreachable!(),
}
@@ -478,32 +496,34 @@ fn unnest_variant_obj(
val_builder.commit_row();
}
- let key_col = Column::String(key_builder.build());
- let val_col = Column::Variant(val_builder.build());
- let tuple_col = Column::Tuple(vec![key_col, val_col]).wrap_nullable(None);
+ let key_col = Column::String(key_builder.build()).wrap_nullable(None);
+ let val_col = Column::Variant(val_builder.build()).wrap_nullable(None);
- (Value::Column(Column::Tuple(vec![tuple_col])), len)
+ (Value::Column(Column::Tuple(vec![key_col, val_col])), len)
}
- _ => (Value::Scalar(Scalar::Tuple(vec![Scalar::Null])), 0),
+ _ => (
+ Value::Scalar(Scalar::Tuple(vec![Scalar::Null, Scalar::Null])),
+ 0,
+ ),
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
-pub enum FlattenMode {
+enum FlattenMode {
Both,
Object,
Array,
}
#[derive(Copy, Clone)]
-pub struct FlattenGenerator {
+struct FlattenGenerator {
outer: bool,
recursive: bool,
mode: FlattenMode,
}
impl FlattenGenerator {
- pub fn create(outer: bool, recursive: bool, mode: FlattenMode) -> FlattenGenerator {
+ fn create(outer: bool, recursive: bool, mode: FlattenMode) -> FlattenGenerator {
Self {
outer,
recursive,
@@ -516,22 +536,59 @@ impl FlattenGenerator {
&mut self,
input: &[u8],
path: &str,
- keys: &mut Vec