From 4bbc3074f9b50b71b8aeb4c1ff8cd873e4a2e11e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 21 Feb 2024 15:52:33 -0800 Subject: [PATCH] fix: Appending null values to element array builders of StructBuilder for null row in a StructArray (#78) --- core/src/execution/shuffle/list.rs | 21 +- core/src/execution/shuffle/map.rs | 438 +++++++++--------- core/src/execution/shuffle/row.rs | 183 ++++++-- .../apache/comet/exec/CometShuffleSuite.scala | 49 ++ 4 files changed, 428 insertions(+), 263 deletions(-) diff --git a/core/src/execution/shuffle/list.rs b/core/src/execution/shuffle/list.rs index 21c3b57d5..53d155f85 100644 --- a/core/src/execution/shuffle/list.rs +++ b/core/src/execution/shuffle/list.rs @@ -17,7 +17,7 @@ use crate::{ errors::CometError, - execution::shuffle::row::{append_field, SparkUnsafeObject}, + execution::shuffle::row::{append_field, SparkUnsafeObject, SparkUnsafeRow}, }; use arrow_array::builder::{ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder, @@ -102,6 +102,7 @@ macro_rules! define_append_element { let is_null = list.is_null_at(idx); if is_null { + // Append a null value to the element builder. element_builder.append_null(); } else { $accessor(element_builder, list, idx); @@ -284,6 +285,7 @@ pub fn append_list_element( let is_null = list.is_null_at(idx); if is_null { + // Append a null value to element builder. element_builder.append_null(); } else { element_builder.append_value(list.get_decimal(idx, *p)) @@ -313,21 +315,22 @@ pub fn append_list_element( } */ DataType::Struct(fields) => { - let element_builder: &mut StructBuilder = list_builder + let struct_builder: &mut StructBuilder = list_builder .values() .as_any_mut() .downcast_mut::() .unwrap(); let is_null = list.is_null_at(idx); - if is_null { - element_builder.append_null(); + let nested_row = if is_null { + SparkUnsafeRow::default() } else { - let nested_row = list.get_struct(idx, fields.len()); - element_builder.append(true); - for (field_idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), element_builder, &nested_row, field_idx); - } + list.get_struct(idx, fields.len()) + }; + + struct_builder.append(!is_null); + for (field_idx, field) in fields.into_iter().enumerate() { + append_field(field.data_type(), struct_builder, &nested_row, field_idx)?; } } _ => { diff --git a/core/src/execution/shuffle/map.rs b/core/src/execution/shuffle/map.rs index 1c0f8abb1..014695293 100644 --- a/core/src/execution/shuffle/map.rs +++ b/core/src/execution/shuffle/map.rs @@ -19,7 +19,7 @@ use crate::{ errors::CometError, execution::shuffle::{ list::SparkUnsafeArray, - row::{append_field, downcast_builder_ref, SparkUnsafeObject}, + row::{append_field, downcast_builder_ref, SparkUnsafeObject, SparkUnsafeRow}, }, }; use arrow_array::builder::{ @@ -91,7 +91,7 @@ macro_rules! define_append_map_element { fn $func( map_builder: &mut MapBuilder<$key_builder_type, $value_builder_type>, map: &SparkUnsafeMap, - ) { + ) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -110,7 +110,9 @@ macro_rules! define_append_map_element { $value_accessor(value_builder, values, idx); } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } }; } @@ -124,7 +126,7 @@ macro_rules! define_append_map_struct_value_element { map_builder: &mut MapBuilder<$key_builder_type, StructBuilder>, map: &SparkUnsafeMap, fields: &Fields, - ) { + ) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -136,17 +138,21 @@ macro_rules! define_append_map_struct_value_element { $key_accessor(key_builder, keys, idx); let value_builder = downcast_builder_ref!(StructBuilder, map_builder.values()); - if values.is_null_at(idx) { + let nested_row = if values.is_null_at(idx) { value_builder.append_null(); + SparkUnsafeRow::default() } else { - let nested_row = values.get_struct(idx, fields.len()); value_builder.append(true); - for (field_idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), value_builder, &nested_row, field_idx); - } + values.get_struct(idx, fields.len()) + }; + + for (field_idx, field) in fields.into_iter().enumerate() { + append_field(field.data_type(), value_builder, &nested_row, field_idx)?; } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } }; } @@ -160,7 +166,7 @@ macro_rules! define_append_map_struct_key_element { map_builder: &mut MapBuilder, map: &SparkUnsafeMap, fields: &Fields, - ) { + ) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -172,7 +178,7 @@ macro_rules! define_append_map_struct_key_element { let nested_row = keys.get_struct(idx, fields.len()); key_builder.append(true); for (field_idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), key_builder, &nested_row, field_idx); + append_field(field.data_type(), key_builder, &nested_row, field_idx)?; } let value_builder = @@ -183,7 +189,9 @@ macro_rules! define_append_map_struct_key_element { $value_accessor(value_builder, values, idx); } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } }; } @@ -193,7 +201,7 @@ fn append_map_struct_struct_element( map: &SparkUnsafeMap, key_fields: &Fields, value_fields: &Fields, -) { +) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -205,21 +213,25 @@ fn append_map_struct_struct_element( let nested_row = keys.get_struct(idx, key_fields.len()); key_builder.append(true); for (field_idx, field) in key_fields.into_iter().enumerate() { - append_field(field.data_type(), key_builder, &nested_row, field_idx); + append_field(field.data_type(), key_builder, &nested_row, field_idx)?; } let value_builder = downcast_builder_ref!(StructBuilder, map_builder.values()); - if values.is_null_at(idx) { + let nested_row = if values.is_null_at(idx) { value_builder.append_null(); + SparkUnsafeRow::default() } else { - let nested_row = values.get_struct(idx, value_fields.len()); value_builder.append(true); - for (field_idx, field) in value_fields.into_iter().enumerate() { - append_field(field.data_type(), value_builder, &nested_row, field_idx); - } + values.get_struct(idx, value_fields.len()) + }; + + for (field_idx, field) in value_fields.into_iter().enumerate() { + append_field(field.data_type(), value_builder, &nested_row, field_idx)?; } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } /// A macro defining a function to append elements of a map to a map builder with given key builder @@ -231,7 +243,7 @@ macro_rules! define_append_map_decimal_value_element { map_builder: &mut MapBuilder<$key_builder_type, Decimal128Builder>, map: &SparkUnsafeMap, precision: u8, - ) { + ) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -249,7 +261,9 @@ macro_rules! define_append_map_decimal_value_element { value_builder.append_value(values.get_decimal(idx, precision)); } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } }; } @@ -263,7 +277,7 @@ macro_rules! define_append_map_decimal_key_element { map_builder: &mut MapBuilder, map: &SparkUnsafeMap, precision: u8, - ) { + ) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -282,7 +296,9 @@ macro_rules! define_append_map_decimal_key_element { $value_accessor(value_builder, values, idx); } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } }; } @@ -292,7 +308,7 @@ fn append_map_decimal_decimal_element( map: &SparkUnsafeMap, key_precision: u8, value_precision: u8, -) { +) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -310,7 +326,9 @@ fn append_map_decimal_decimal_element( value_builder.append_value(values.get_decimal(idx, value_precision)); } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } fn append_map_decimal_struct_element( @@ -318,7 +336,7 @@ fn append_map_decimal_struct_element( map: &SparkUnsafeMap, precision: u8, fields: &Fields, -) { +) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -330,17 +348,21 @@ fn append_map_decimal_struct_element( key_builder.append_value(keys.get_decimal(idx, precision)); let value_builder = downcast_builder_ref!(StructBuilder, map_builder.values()); - if values.is_null_at(idx) { + let nested_row = if values.is_null_at(idx) { value_builder.append_null(); + SparkUnsafeRow::default() } else { - let nested_row = values.get_struct(idx, fields.len()); value_builder.append(true); - for (field_idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), value_builder, &nested_row, field_idx); - } + values.get_struct(idx, fields.len()) + }; + + for (field_idx, field) in fields.into_iter().enumerate() { + append_field(field.data_type(), value_builder, &nested_row, field_idx)?; } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } fn append_map_struct_decimal_element( @@ -348,7 +370,7 @@ fn append_map_struct_decimal_element( map: &SparkUnsafeMap, precision: u8, fields: &Fields, -) { +) -> Result<(), CometError> { let keys = &map.keys; let values = &map.values; @@ -360,7 +382,7 @@ fn append_map_struct_decimal_element( let nested_row = values.get_struct(idx, fields.len()); key_builder.append(true); for (field_idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), key_builder, &nested_row, field_idx); + append_field(field.data_type(), key_builder, &nested_row, field_idx)?; } let value_builder = downcast_builder_ref!(Decimal128Builder, map_builder.values()); @@ -370,7 +392,9 @@ fn append_map_struct_decimal_element( value_builder.append_value(values.get_decimal(idx, precision)); } } - map_builder.append(true).unwrap(); + map_builder.append(true)?; + + Ok(()) } // Boolean key @@ -1912,601 +1936,601 @@ pub fn append_map_elements( map_builder: &mut MapBuilder, map: &SparkUnsafeMap, ) -> Result<(), CometError> { - let (key_dt, value_dt, _) = get_map_key_value_dt(field).unwrap(); + let (key_dt, value_dt, _) = get_map_key_value_dt(field)?; // macro cannot expand to match arm match (key_dt, value_dt) { (DataType::Boolean, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_boolean_element(map_builder, map); + append_map_boolean_boolean_element(map_builder, map)?; } (DataType::Boolean, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_int8_element(map_builder, map); + append_map_boolean_int8_element(map_builder, map)?; } (DataType::Boolean, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_int16_element(map_builder, map); + append_map_boolean_int16_element(map_builder, map)?; } (DataType::Boolean, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_int32_element(map_builder, map); + append_map_boolean_int32_element(map_builder, map)?; } (DataType::Boolean, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_int64_element(map_builder, map); + append_map_boolean_int64_element(map_builder, map)?; } (DataType::Boolean, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_float32_element(map_builder, map); + append_map_boolean_float32_element(map_builder, map)?; } (DataType::Boolean, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_float64_element(map_builder, map); + append_map_boolean_float64_element(map_builder, map)?; } (DataType::Boolean, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_date32_element(map_builder, map); + append_map_boolean_date32_element(map_builder, map)?; } (DataType::Boolean, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_boolean_timestamp_element(map_builder, map); + append_map_boolean_timestamp_element(map_builder, map)?; } (DataType::Boolean, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_binary_element(map_builder, map); + append_map_boolean_binary_element(map_builder, map)?; } (DataType::Boolean, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_string_element(map_builder, map); + append_map_boolean_string_element(map_builder, map)?; } (DataType::Boolean, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_decimal_element(map_builder, map, *p); + append_map_boolean_decimal_element(map_builder, map, *p)?; } (DataType::Boolean, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_boolean_struct_element(map_builder, map, fields); + append_map_boolean_struct_element(map_builder, map, fields)?; } (DataType::Int8, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_boolean_element(map_builder, map); + append_map_int8_boolean_element(map_builder, map)?; } (DataType::Int8, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_int8_element(map_builder, map); + append_map_int8_int8_element(map_builder, map)?; } (DataType::Int8, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_int16_element(map_builder, map); + append_map_int8_int16_element(map_builder, map)?; } (DataType::Int8, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_int32_element(map_builder, map); + append_map_int8_int32_element(map_builder, map)?; } (DataType::Int8, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_int64_element(map_builder, map); + append_map_int8_int64_element(map_builder, map)?; } (DataType::Int8, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_float32_element(map_builder, map); + append_map_int8_float32_element(map_builder, map)?; } (DataType::Int8, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_float64_element(map_builder, map); + append_map_int8_float64_element(map_builder, map)?; } (DataType::Int8, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_date32_element(map_builder, map); + append_map_int8_date32_element(map_builder, map)?; } (DataType::Int8, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_int8_timestamp_element(map_builder, map); + append_map_int8_timestamp_element(map_builder, map)?; } (DataType::Int8, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_binary_element(map_builder, map); + append_map_int8_binary_element(map_builder, map)?; } (DataType::Int8, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_string_element(map_builder, map); + append_map_int8_string_element(map_builder, map)?; } (DataType::Int8, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_decimal_element(map_builder, map, *p); + append_map_int8_decimal_element(map_builder, map, *p)?; } (DataType::Int8, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int8_struct_element(map_builder, map, fields); + append_map_int8_struct_element(map_builder, map, fields)?; } (DataType::Int16, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_boolean_element(map_builder, map); + append_map_int16_boolean_element(map_builder, map)?; } (DataType::Int16, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_int8_element(map_builder, map); + append_map_int16_int8_element(map_builder, map)?; } (DataType::Int16, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_int16_element(map_builder, map); + append_map_int16_int16_element(map_builder, map)?; } (DataType::Int16, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_int32_element(map_builder, map); + append_map_int16_int32_element(map_builder, map)?; } (DataType::Int16, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_int64_element(map_builder, map); + append_map_int16_int64_element(map_builder, map)?; } (DataType::Int16, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_float32_element(map_builder, map); + append_map_int16_float32_element(map_builder, map)?; } (DataType::Int16, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_float64_element(map_builder, map); + append_map_int16_float64_element(map_builder, map)?; } (DataType::Int16, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_date32_element(map_builder, map); + append_map_int16_date32_element(map_builder, map)?; } (DataType::Int16, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_int16_timestamp_element(map_builder, map); + append_map_int16_timestamp_element(map_builder, map)?; } (DataType::Int16, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_binary_element(map_builder, map); + append_map_int16_binary_element(map_builder, map)?; } (DataType::Int16, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_string_element(map_builder, map); + append_map_int16_string_element(map_builder, map)?; } (DataType::Int16, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_decimal_element(map_builder, map, *p); + append_map_int16_decimal_element(map_builder, map, *p)?; } (DataType::Int16, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int16_struct_element(map_builder, map, fields); + append_map_int16_struct_element(map_builder, map, fields)?; } (DataType::Int32, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_boolean_element(map_builder, map); + append_map_int32_boolean_element(map_builder, map)?; } (DataType::Int32, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_int8_element(map_builder, map); + append_map_int32_int8_element(map_builder, map)?; } (DataType::Int32, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_int16_element(map_builder, map); + append_map_int32_int16_element(map_builder, map)?; } (DataType::Int32, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_int32_element(map_builder, map); + append_map_int32_int32_element(map_builder, map)?; } (DataType::Int32, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_int64_element(map_builder, map); + append_map_int32_int64_element(map_builder, map)?; } (DataType::Int32, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_float32_element(map_builder, map); + append_map_int32_float32_element(map_builder, map)?; } (DataType::Int32, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_float64_element(map_builder, map); + append_map_int32_float64_element(map_builder, map)?; } (DataType::Int32, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_date32_element(map_builder, map); + append_map_int32_date32_element(map_builder, map)?; } (DataType::Int32, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_int32_timestamp_element(map_builder, map); + append_map_int32_timestamp_element(map_builder, map)?; } (DataType::Int32, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_binary_element(map_builder, map); + append_map_int32_binary_element(map_builder, map)?; } (DataType::Int32, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_string_element(map_builder, map); + append_map_int32_string_element(map_builder, map)?; } (DataType::Int32, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_decimal_element(map_builder, map, *p); + append_map_int32_decimal_element(map_builder, map, *p)?; } (DataType::Int32, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int32_struct_element(map_builder, map, fields); + append_map_int32_struct_element(map_builder, map, fields)?; } (DataType::Int64, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_boolean_element(map_builder, map); + append_map_int64_boolean_element(map_builder, map)?; } (DataType::Int64, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_int8_element(map_builder, map); + append_map_int64_int8_element(map_builder, map)?; } (DataType::Int64, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_int16_element(map_builder, map); + append_map_int64_int16_element(map_builder, map)?; } (DataType::Int64, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_int32_element(map_builder, map); + append_map_int64_int32_element(map_builder, map)?; } (DataType::Int64, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_int64_element(map_builder, map); + append_map_int64_int64_element(map_builder, map)?; } (DataType::Int64, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_float32_element(map_builder, map); + append_map_int64_float32_element(map_builder, map)?; } (DataType::Int64, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_float64_element(map_builder, map); + append_map_int64_float64_element(map_builder, map)?; } (DataType::Int64, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_date32_element(map_builder, map); + append_map_int64_date32_element(map_builder, map)?; } (DataType::Int64, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_int64_timestamp_element(map_builder, map); + append_map_int64_timestamp_element(map_builder, map)?; } (DataType::Int64, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_binary_element(map_builder, map); + append_map_int64_binary_element(map_builder, map)?; } (DataType::Int64, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_string_element(map_builder, map); + append_map_int64_string_element(map_builder, map)?; } (DataType::Int64, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_decimal_element(map_builder, map, *p); + append_map_int64_decimal_element(map_builder, map, *p)?; } (DataType::Int64, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_int64_struct_element(map_builder, map, fields); + append_map_int64_struct_element(map_builder, map, fields)?; } (DataType::Float32, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_boolean_element(map_builder, map); + append_map_float32_boolean_element(map_builder, map)?; } (DataType::Float32, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_int8_element(map_builder, map); + append_map_float32_int8_element(map_builder, map)?; } (DataType::Float32, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_int16_element(map_builder, map); + append_map_float32_int16_element(map_builder, map)?; } (DataType::Float32, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_int32_element(map_builder, map); + append_map_float32_int32_element(map_builder, map)?; } (DataType::Float32, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_int64_element(map_builder, map); + append_map_float32_int64_element(map_builder, map)?; } (DataType::Float32, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_float32_element(map_builder, map); + append_map_float32_float32_element(map_builder, map)?; } (DataType::Float32, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_float64_element(map_builder, map); + append_map_float32_float64_element(map_builder, map)?; } (DataType::Float32, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_date32_element(map_builder, map); + append_map_float32_date32_element(map_builder, map)?; } (DataType::Float32, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_float32_timestamp_element(map_builder, map); + append_map_float32_timestamp_element(map_builder, map)?; } (DataType::Float32, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_binary_element(map_builder, map); + append_map_float32_binary_element(map_builder, map)?; } (DataType::Float32, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_string_element(map_builder, map); + append_map_float32_string_element(map_builder, map)?; } (DataType::Float32, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_decimal_element(map_builder, map, *p); + append_map_float32_decimal_element(map_builder, map, *p)?; } (DataType::Float32, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float32_struct_element(map_builder, map, fields); + append_map_float32_struct_element(map_builder, map, fields)?; } (DataType::Float64, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_boolean_element(map_builder, map); + append_map_float64_boolean_element(map_builder, map)?; } (DataType::Float64, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_int8_element(map_builder, map); + append_map_float64_int8_element(map_builder, map)?; } (DataType::Float64, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_int16_element(map_builder, map); + append_map_float64_int16_element(map_builder, map)?; } (DataType::Float64, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_int32_element(map_builder, map); + append_map_float64_int32_element(map_builder, map)?; } (DataType::Float64, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_int64_element(map_builder, map); + append_map_float64_int64_element(map_builder, map)?; } (DataType::Float64, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_float32_element(map_builder, map); + append_map_float64_float32_element(map_builder, map)?; } (DataType::Float64, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_float64_element(map_builder, map); + append_map_float64_float64_element(map_builder, map)?; } (DataType::Float64, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_date32_element(map_builder, map); + append_map_float64_date32_element(map_builder, map)?; } (DataType::Float64, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_float64_timestamp_element(map_builder, map); + append_map_float64_timestamp_element(map_builder, map)?; } (DataType::Float64, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_binary_element(map_builder, map); + append_map_float64_binary_element(map_builder, map)?; } (DataType::Float64, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_string_element(map_builder, map); + append_map_float64_string_element(map_builder, map)?; } (DataType::Float64, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_decimal_element(map_builder, map, *p); + append_map_float64_decimal_element(map_builder, map, *p)?; } (DataType::Float64, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_float64_struct_element(map_builder, map, fields); + append_map_float64_struct_element(map_builder, map, fields)?; } (DataType::Date32, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_boolean_element(map_builder, map); + append_map_date32_boolean_element(map_builder, map)?; } (DataType::Date32, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_int8_element(map_builder, map); + append_map_date32_int8_element(map_builder, map)?; } (DataType::Date32, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_int16_element(map_builder, map); + append_map_date32_int16_element(map_builder, map)?; } (DataType::Date32, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_int32_element(map_builder, map); + append_map_date32_int32_element(map_builder, map)?; } (DataType::Date32, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_int64_element(map_builder, map); + append_map_date32_int64_element(map_builder, map)?; } (DataType::Date32, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_float32_element(map_builder, map); + append_map_date32_float32_element(map_builder, map)?; } (DataType::Date32, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_float64_element(map_builder, map); + append_map_date32_float64_element(map_builder, map)?; } (DataType::Date32, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_date32_element(map_builder, map); + append_map_date32_date32_element(map_builder, map)?; } (DataType::Date32, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_date32_timestamp_element(map_builder, map); + append_map_date32_timestamp_element(map_builder, map)?; } (DataType::Date32, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_binary_element(map_builder, map); + append_map_date32_binary_element(map_builder, map)?; } (DataType::Date32, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_string_element(map_builder, map); + append_map_date32_string_element(map_builder, map)?; } (DataType::Date32, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_decimal_element(map_builder, map, *p); + append_map_date32_decimal_element(map_builder, map, *p)?; } (DataType::Date32, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_date32_struct_element(map_builder, map, fields); + append_map_date32_struct_element(map_builder, map, fields)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Boolean) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_boolean_element(map_builder, map); + append_map_timestamp_boolean_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Int8) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_int8_element(map_builder, map); + append_map_timestamp_int8_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Int16) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_int16_element(map_builder, map); + append_map_timestamp_int16_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Int32) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_int32_element(map_builder, map); + append_map_timestamp_int32_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Int64) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_int64_element(map_builder, map); + append_map_timestamp_int64_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Float32) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_float32_element(map_builder, map); + append_map_timestamp_float32_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Float64) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_float64_element(map_builder, map); + append_map_timestamp_float64_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Date32) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_date32_element(map_builder, map); + append_map_timestamp_date32_element(map_builder, map)?; } ( DataType::Timestamp(TimeUnit::Microsecond, _), @@ -2516,296 +2540,296 @@ pub fn append_map_elements( MapBuilder, map_builder ); - append_map_timestamp_timestamp_element(map_builder, map); + append_map_timestamp_timestamp_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Binary) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_binary_element(map_builder, map); + append_map_timestamp_binary_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Utf8) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_timestamp_string_element(map_builder, map); + append_map_timestamp_string_element(map_builder, map)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_timestamp_decimal_element(map_builder, map, *p); + append_map_timestamp_decimal_element(map_builder, map, *p)?; } (DataType::Timestamp(TimeUnit::Microsecond, _), DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_timestamp_struct_element(map_builder, map, fields); + append_map_timestamp_struct_element(map_builder, map, fields)?; } (DataType::Binary, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_boolean_element(map_builder, map); + append_map_binary_boolean_element(map_builder, map)?; } (DataType::Binary, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_int8_element(map_builder, map); + append_map_binary_int8_element(map_builder, map)?; } (DataType::Binary, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_int16_element(map_builder, map); + append_map_binary_int16_element(map_builder, map)?; } (DataType::Binary, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_int32_element(map_builder, map); + append_map_binary_int32_element(map_builder, map)?; } (DataType::Binary, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_int64_element(map_builder, map); + append_map_binary_int64_element(map_builder, map)?; } (DataType::Binary, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_float32_element(map_builder, map); + append_map_binary_float32_element(map_builder, map)?; } (DataType::Binary, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_float64_element(map_builder, map); + append_map_binary_float64_element(map_builder, map)?; } (DataType::Binary, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_date32_element(map_builder, map); + append_map_binary_date32_element(map_builder, map)?; } (DataType::Binary, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_binary_timestamp_element(map_builder, map); + append_map_binary_timestamp_element(map_builder, map)?; } (DataType::Binary, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_binary_element(map_builder, map); + append_map_binary_binary_element(map_builder, map)?; } (DataType::Binary, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_string_element(map_builder, map); + append_map_binary_string_element(map_builder, map)?; } (DataType::Binary, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_decimal_element(map_builder, map, *p); + append_map_binary_decimal_element(map_builder, map, *p)?; } (DataType::Binary, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_binary_struct_element(map_builder, map, fields); + append_map_binary_struct_element(map_builder, map, fields)?; } (DataType::Utf8, DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_boolean_element(map_builder, map); + append_map_string_boolean_element(map_builder, map)?; } (DataType::Utf8, DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_int8_element(map_builder, map); + append_map_string_int8_element(map_builder, map)?; } (DataType::Utf8, DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_int16_element(map_builder, map); + append_map_string_int16_element(map_builder, map)?; } (DataType::Utf8, DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_int32_element(map_builder, map); + append_map_string_int32_element(map_builder, map)?; } (DataType::Utf8, DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_int64_element(map_builder, map); + append_map_string_int64_element(map_builder, map)?; } (DataType::Utf8, DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_float32_element(map_builder, map); + append_map_string_float32_element(map_builder, map)?; } (DataType::Utf8, DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_float64_element(map_builder, map); + append_map_string_float64_element(map_builder, map)?; } (DataType::Utf8, DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_date32_element(map_builder, map); + append_map_string_date32_element(map_builder, map)?; } (DataType::Utf8, DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_string_timestamp_element(map_builder, map); + append_map_string_timestamp_element(map_builder, map)?; } (DataType::Utf8, DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_binary_element(map_builder, map); + append_map_string_binary_element(map_builder, map)?; } (DataType::Utf8, DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_string_element(map_builder, map); + append_map_string_string_element(map_builder, map)?; } (DataType::Utf8, DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_decimal_element(map_builder, map, *p); + append_map_string_decimal_element(map_builder, map, *p)?; } (DataType::Utf8, DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_string_struct_element(map_builder, map, fields); + append_map_string_struct_element(map_builder, map, fields)?; } (DataType::Decimal128(p, _), DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_boolean_element(map_builder, map, *p); + append_map_decimal_boolean_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_int8_element(map_builder, map, *p); + append_map_decimal_int8_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_int16_element(map_builder, map, *p); + append_map_decimal_int16_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_int32_element(map_builder, map, *p); + append_map_decimal_int32_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_int64_element(map_builder, map, *p); + append_map_decimal_int64_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_float32_element(map_builder, map, *p); + append_map_decimal_float32_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_float64_element(map_builder, map, *p); + append_map_decimal_float64_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_date32_element(map_builder, map, *p); + append_map_decimal_date32_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_decimal_timestamp_element(map_builder, map, *p); + append_map_decimal_timestamp_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_binary_element(map_builder, map, *p); + append_map_decimal_binary_element(map_builder, map, *p)?; } (DataType::Decimal128(p, _), DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_string_element(map_builder, map, *p); + append_map_decimal_string_element(map_builder, map, *p)?; } (DataType::Decimal128(p1, _), DataType::Decimal128(p2, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_decimal_element(map_builder, map, *p1, *p2); + append_map_decimal_decimal_element(map_builder, map, *p1, *p2)?; } (DataType::Decimal128(p, _), DataType::Struct(fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_decimal_struct_element(map_builder, map, *p, fields); + append_map_decimal_struct_element(map_builder, map, *p, fields)?; } (DataType::Struct(fields), DataType::Boolean) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_boolean_element(map_builder, map, fields); + append_map_struct_boolean_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Int8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_int8_element(map_builder, map, fields); + append_map_struct_int8_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Int16) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_int16_element(map_builder, map, fields); + append_map_struct_int16_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Int32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_int32_element(map_builder, map, fields); + append_map_struct_int32_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Int64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_int64_element(map_builder, map, fields); + append_map_struct_int64_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Float32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_float32_element(map_builder, map, fields); + append_map_struct_float32_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Float64) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_float64_element(map_builder, map, fields); + append_map_struct_float64_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Date32) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_date32_element(map_builder, map, fields); + append_map_struct_date32_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Timestamp(TimeUnit::Microsecond, _)) => { let map_builder = downcast_builder_ref!( MapBuilder, map_builder ); - append_map_struct_timestamp_element(map_builder, map, fields); + append_map_struct_timestamp_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Binary) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_binary_element(map_builder, map, fields); + append_map_struct_binary_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Utf8) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_string_element(map_builder, map, fields); + append_map_struct_string_element(map_builder, map, fields)?; } (DataType::Struct(fields), DataType::Decimal128(p, _)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_decimal_element(map_builder, map, *p, fields); + append_map_struct_decimal_element(map_builder, map, *p, fields)?; } (DataType::Struct(key_fields), DataType::Struct(value_fields)) => { let map_builder = downcast_builder_ref!(MapBuilder, map_builder); - append_map_struct_struct_element(map_builder, map, key_fields, value_fields); + append_map_struct_struct_element(map_builder, map, key_fields, value_fields)?; } _ => { return Err(CometError::Internal(format!( diff --git a/core/src/execution/shuffle/row.rs b/core/src/execution/shuffle/row.rs index d1f73d2a6..36a7b2424 100644 --- a/core/src/execution/shuffle/row.rs +++ b/core/src/execution/shuffle/row.rs @@ -196,6 +196,16 @@ impl SparkUnsafeObject for SparkUnsafeRow { } } +impl Default for SparkUnsafeRow { + fn default() -> Self { + Self { + row_addr: -1, + row_size: -1, + row_bitset_width: -1, + } + } +} + impl SparkUnsafeRow { fn new(schema: &Vec) -> Self { Self { @@ -205,6 +215,11 @@ impl SparkUnsafeRow { } } + /// Returns true if the row is a null row. + pub fn is_null_row(&self) -> bool { + self.row_addr == -1 && self.row_size == -1 && self.row_bitset_width == -1 + } + /// Calculate the width of the bitset for the row in bytes. /// The logic is from Spark `UnsafeRow.calculateBitSetWidthInBytes`. #[inline] @@ -278,24 +293,34 @@ pub(crate) use downcast_builder_ref; /// Appends field of row to the given struct builder. `dt` is the data type of the field. /// `struct_builder` is the struct builder of the row. `row` is the row that contains the field. -/// `idx` is the index of the field in the row. +/// `idx` is the index of the field in the row. The caller is responsible for ensuring that the +/// `struct_builder.append` is called before/after calling this function to append the null buffer +/// of the struct array. #[allow(clippy::redundant_closure_call)] pub(crate) fn append_field( dt: &DataType, struct_builder: &mut StructBuilder, row: &SparkUnsafeRow, idx: usize, -) { +) -> Result<(), CometError> { /// A macro for generating code of appending value into field builder of Arrow struct builder. macro_rules! append_field_to_builder { ($builder_type:ty, $accessor:expr) => {{ let field_builder = struct_builder.field_builder::<$builder_type>(idx).unwrap(); - let is_null = row.is_null_at(idx); - if is_null { + if row.is_null_row() { + // The row is null. field_builder.append_null(); } else { - $accessor(field_builder); + let is_null = row.is_null_at(idx); + + if is_null { + // The field in the row is null. + // Append a null value to the field builder. + field_builder.append_null(); + } else { + $accessor(field_builder); + } } }}; } @@ -307,17 +332,25 @@ pub(crate) fn append_field( let field_builder = struct_builder .field_builder::>(idx) .unwrap(); - let is_null = row.is_null_at(idx); - if is_null { - field_builder.append(false).unwrap(); + if row.is_null_row() { + // The row is null. + field_builder.append(false)?; } else { - append_map_elements::<$key_builder_type, $value_builder_type>( - $field, - field_builder, - &row.get_map(idx), - ) - .unwrap(); + let is_null = row.is_null_at(idx); + + if is_null { + // The field in the row is null. + // Append a null value to the map builder. + field_builder.append(false)?; + } else { + append_map_elements::<$key_builder_type, $value_builder_type>( + $field, + field_builder, + &row.get_map(idx), + ) + .unwrap(); + } } }}; } @@ -329,17 +362,25 @@ pub(crate) fn append_field( let field_builder = struct_builder .field_builder::>(idx) .unwrap(); - let is_null = row.is_null_at(idx); - if is_null { + if row.is_null_row() { + // The row is null. field_builder.append_null(); } else { - append_list_element::<$builder_type>( - $element_dt, - field_builder, - &row.get_array(idx), - ) - .unwrap() + let is_null = row.is_null_at(idx); + + if is_null { + // The field in the row is null. + // Append a null value to the list builder. + field_builder.append_null(); + } else { + append_list_element::<$builder_type>( + $element_dt, + field_builder, + &row.get_array(idx), + ) + .unwrap() + } } }}; } @@ -397,13 +438,29 @@ pub(crate) fn append_field( .append_value(row.get_decimal(idx, *p))); } DataType::Struct(fields) => { - append_field_to_builder!(StructBuilder, |builder: &mut StructBuilder| { - let nested_row = row.get_struct(idx, fields.len()); - builder.append(true); + // Appending value into struct field builder of Arrow struct builder. + let field_builder = struct_builder.field_builder::(idx).unwrap(); + + if row.is_null_row() { + // The row is null. + field_builder.append_null(); + } else { + let is_null = row.is_null_at(idx); + + let nested_row = if is_null { + // The field in the row is null, i.e., a null nested row. + // Append a null value to the row builder. + field_builder.append_null(); + SparkUnsafeRow::default() + } else { + field_builder.append(true); + row.get_struct(idx, fields.len()) + }; + for (field_idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), builder, &nested_row, field_idx); + append_field(field.data_type(), field_builder, &nested_row, field_idx)?; } - }); + } } DataType::Map(field, _) => { let (key_dt, value_dt, _) = get_map_key_value_dt(field).unwrap(); @@ -978,9 +1035,11 @@ pub(crate) fn append_field( unreachable!("Unsupported data type of struct field: {:?}", dt) } } + + Ok(()) } -/// Appends column of rows to the given array builder. +/// Appends column of top rows to the given array builder. #[allow(clippy::redundant_closure_call, clippy::too_many_arguments)] pub(crate) fn append_columns( row_addresses_ptr: *mut jlong, @@ -995,7 +1054,7 @@ pub(crate) fn append_columns( /// A macro for generating code of appending values into Arrow array builders. macro_rules! append_column_to_builder { ($builder_type:ty, $accessor:expr) => {{ - let builder = builder + let element_builder = builder .as_any_mut() .downcast_mut::<$builder_type>() .unwrap(); @@ -1009,9 +1068,11 @@ pub(crate) fn append_columns( let is_null = row.is_null_at(column_idx); if is_null { - builder.append_null(); + // The element value is null. + // Append a null value to the element builder. + element_builder.append_null(); } else { - $accessor(builder, &row, column_idx); + $accessor(element_builder, &row, column_idx); } } }}; @@ -1020,7 +1081,7 @@ pub(crate) fn append_columns( /// A macro for generating code of appending values into Arrow `ListBuilder`. macro_rules! append_column_to_list_builder { ($builder_type:ty, $element_dt:expr) => {{ - let builder = builder + let list_builder = builder .as_any_mut() .downcast_mut::>() .unwrap(); @@ -1034,11 +1095,13 @@ pub(crate) fn append_columns( let is_null = row.is_null_at(column_idx); if is_null { - builder.append_null(); + // The list is null. + // Append a null value to the list builder. + list_builder.append_null(); } else { append_list_element::<$builder_type>( $element_dt, - builder, + list_builder, &row.get_array(column_idx), ) .unwrap() @@ -1050,7 +1113,7 @@ pub(crate) fn append_columns( /// A macro for generating code of appending values into Arrow `MapBuilder`. macro_rules! append_column_to_map_builder { ($key_builder_type:ty, $value_builder_type:ty, $field:expr) => {{ - let builder = builder + let map_builder = builder .as_any_mut() .downcast_mut::>() .unwrap(); @@ -1064,11 +1127,13 @@ pub(crate) fn append_columns( let is_null = row.is_null_at(column_idx); if is_null { - builder.append(false)?; + // The map is null. + // Append a null value to the map builder. + map_builder.append(false)?; } else { append_map_elements::<$key_builder_type, $value_builder_type>( $field, - builder, + map_builder, &row.get_map(column_idx), ) .unwrap() @@ -1077,6 +1142,39 @@ pub(crate) fn append_columns( }}; } + /// A macro for generating code of appending values into Arrow `StructBuilder`. + macro_rules! append_column_to_struct_builder { + ($fields:expr) => {{ + let struct_builder = builder + .as_any_mut() + .downcast_mut::() + .unwrap(); + let mut row = SparkUnsafeRow::new(schema); + + for i in row_start..row_end { + let row_addr = unsafe { *row_addresses_ptr.add(i) }; + let row_size = unsafe { *row_sizes_ptr.add(i) }; + row.point_to(row_addr, row_size); + + let is_null = row.is_null_at(column_idx); + + let nested_row = if is_null { + // The struct is null. + // Append a null value to the struct builder and field builders. + struct_builder.append_null(); + SparkUnsafeRow::default() + } else { + struct_builder.append(true); + row.get_struct(column_idx, $fields.len()) + }; + + for (idx, field) in $fields.into_iter().enumerate() { + append_field(field.data_type(), struct_builder, &nested_row, idx)?; + } + } + }}; + } + let dt = &schema[column_idx]; match dt { @@ -1709,16 +1807,7 @@ pub(crate) fn append_columns( _ => unreachable!("Unsupported data type of list element: {:?}", dt), }, DataType::Struct(fields) => { - append_column_to_builder!( - StructBuilder, - |builder: &mut StructBuilder, row: &SparkUnsafeRow, idx| { - let nested_row = row.get_struct(idx, fields.len()); - builder.append(true); - for (idx, field) in fields.into_iter().enumerate() { - append_field(field.data_type(), builder, &nested_row, idx); - } - } - ); + append_column_to_struct_builder!(fields); } _ => { unreachable!("Unsupported data type of column: {:?}", dt) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala index a7aad2b0e..ac5664911 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala @@ -63,6 +63,55 @@ abstract class CometShuffleSuiteBase extends CometTestBase with AdaptiveSparkPla import testImplicits._ + test("columnar shuffle on nested struct including nulls") { + Seq(10, 201).foreach { numPartitions => + Seq("1.0", "10.0").foreach { ratio => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "false", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + withParquetTable( + (0 until 50).map(i => + (i, Seq((i + 1, i.toString), null, (i + 3, (i + 3).toString)), i + 1)), + "tbl") { + val df = sql("SELECT * FROM tbl") + .filter($"_1" > 1) + .repartition(numPartitions, $"_1", $"_2", $"_3") + .sortWithinPartitions($"_1") + + checkSparkAnswer(df) + checkCometExchange(df, 1, false) + } + } + } + } + } + + test("columnar shuffle on struct including nulls") { + Seq(10, 201).foreach { numPartitions => + Seq("1.0", "10.0").foreach { ratio => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> "false", + CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) { + val data: Seq[(Int, (Int, String))] = + Seq((1, (0, "1")), (2, (3, "3")), (3, null)) + withParquetTable(data, "tbl") { + val df = sql("SELECT * FROM tbl") + .filter($"_1" > 1) + .repartition(numPartitions, $"_1", $"_2") + .sortWithinPartitions($"_1") + + checkSparkAnswer(df) + checkCometExchange(df, 1, false) + } + } + } + } + } + test("RoundRobinPartitioning is supported by columnar shuffle") { withSQLConf( // AQE has `ShuffleStage` which is a leaf node which blocks