diff --git a/native/core/src/execution/shuffle/codec.rs b/native/core/src/execution/shuffle/codec.rs index a4ad53a2b..9c5ffcbe0 100644 --- a/native/core/src/execution/shuffle/codec.rs +++ b/native/core/src/execution/shuffle/codec.rs @@ -457,7 +457,7 @@ impl<'a> BatchReader<'a> { let null_buffer = self.read_null_buffer(); Arc::new(Date32Array::try_new(data_buffer, null_buffer)?) } - DataType::Timestamp(TimeUnit::Microsecond, _) => { + DataType::Timestamp(TimeUnit::Microsecond, None) => { let buffer = self.read_buffer(); let data_buffer = ScalarBuffer::::from(buffer); let null_buffer = self.read_null_buffer(); @@ -466,6 +466,14 @@ impl<'a> BatchReader<'a> { null_buffer, )?) } + DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { + let buffer = self.read_buffer(); + let data_buffer = ScalarBuffer::::from(buffer); + let null_buffer = self.read_null_buffer(); + Arc::new( + TimestampMicrosecondArray::try_new(data_buffer, null_buffer)?.with_timezone(tz), + ) + } DataType::Decimal128(p, s) => { let buffer = self.read_buffer(); let data_buffer = ScalarBuffer::::from(buffer); @@ -543,7 +551,7 @@ impl<'a> BatchReader<'a> { } }; match data_type { - DataType::Dictionary(_, _) | DataType::Timestamp(_, _) => { + DataType::Dictionary(_, _) | DataType::Timestamp(_, Some(_)) => { // no need to increment } DataType::Decimal128(_, _) => self.offset += 3, @@ -621,6 +629,7 @@ mod test { use super::*; use arrow_array::builder::{ BooleanBuilder, Date32Builder, Decimal128Builder, Int32Builder, StringDictionaryBuilder, + TimestampMicrosecondBuilder, }; use std::sync::Arc; @@ -632,7 +641,6 @@ mod test { writer.write_partial_schema(&batch.schema()).unwrap(); writer.write_batch(&batch).unwrap(); let buffer = writer.inner(); - // assert_eq!(421203, buffer.len()); let mut reader = BatchReader::new(&buffer); let batch2 = reader.read_batch().unwrap(); @@ -641,15 +649,21 @@ mod test { fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ - Field::new("c0", DataType::Int32, true), + Field::new("a", DataType::Int32, true), Field::new( - "c1", + "b", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true, ), - Field::new("c2", DataType::Date32, true), - Field::new("c3", DataType::Decimal128(11, 2), true), - Field::new("c4", DataType::Boolean, true), + Field::new("c", DataType::Date32, true), + Field::new("d", DataType::Decimal128(11, 2), true), + Field::new("e", DataType::Boolean, true), + Field::new("f", DataType::Timestamp(TimeUnit::Microsecond, None), true), + Field::new( + "g", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + true, + ), ])); let mut a = Int32Builder::new(); let mut b = StringDictionaryBuilder::new(); @@ -657,25 +671,33 @@ mod test { let mut d = Decimal128Builder::new() .with_precision_and_scale(11, 2) .unwrap(); - let mut c4_bool = BooleanBuilder::with_capacity(num_rows); + let mut e = BooleanBuilder::with_capacity(num_rows); + let mut f = TimestampMicrosecondBuilder::with_capacity(num_rows); + let mut g = TimestampMicrosecondBuilder::with_capacity(num_rows).with_timezone("UTC"); for i in 0..num_rows { a.append_value(i as i32); c.append_value(i as i32); d.append_value((i * 1000000) as i128); if allow_nulls && i % 10 == 0 { b.append_null(); - c4_bool.append_null(); + e.append_null(); + f.append_null(); + g.append_null(); } else { // test for dictionary-encoded strings b.append_value("this string is repeated a lot"); - c4_bool.append_value(i % 2 == 0) + e.append_value(i % 2 == 0); + f.append_value((i * 100000000) as i64); + g.append_value((i * 100000000) as i64); } } let a = a.finish(); let b: DictionaryArray = b.finish(); let c = c.finish(); let d = d.finish(); - let c4_bool = c4_bool.finish(); + let e = e.finish(); + let f = f.finish(); + let g = g.finish(); RecordBatch::try_new( Arc::clone(&schema), vec![ @@ -683,7 +705,9 @@ mod test { Arc::new(b), Arc::new(c), Arc::new(d), - Arc::new(c4_bool), + Arc::new(e), + Arc::new(f), + Arc::new(g), ], ) .unwrap()