Skip to content

Commit

Permalink
timestamp tests + fix
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 30, 2024
1 parent 34dd489 commit 874a02f
Showing 1 changed file with 37 additions and 13 deletions.
50 changes: 37 additions & 13 deletions native/core/src/execution/shuffle/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl<'a> BatchReader<'a> {
let null_buffer = self.read_null_buffer();
Arc::new(Date32Array::try_new(data_buffer, null_buffer)?)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
let buffer = self.read_buffer();
let data_buffer = ScalarBuffer::<i64>::from(buffer);
let null_buffer = self.read_null_buffer();
Expand All @@ -466,6 +466,14 @@ impl<'a> BatchReader<'a> {
null_buffer,
)?)
}
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
let buffer = self.read_buffer();
let data_buffer = ScalarBuffer::<i64>::from(buffer);
let null_buffer = self.read_null_buffer();
Arc::new(
TimestampMicrosecondArray::try_new(data_buffer, null_buffer)?.with_timezone(tz),
)
}
DataType::Decimal128(p, s) => {
let buffer = self.read_buffer();
let data_buffer = ScalarBuffer::<i128>::from(buffer);
Expand Down Expand Up @@ -543,7 +551,7 @@ impl<'a> BatchReader<'a> {
}
};
match data_type {
DataType::Dictionary(_, _) | DataType::Timestamp(_, _) => {
DataType::Dictionary(_, _) | DataType::Timestamp(_, Some(_)) => {
// no need to increment
}
DataType::Decimal128(_, _) => self.offset += 3,
Expand Down Expand Up @@ -621,6 +629,7 @@ mod test {
use super::*;
use arrow_array::builder::{
BooleanBuilder, Date32Builder, Decimal128Builder, Int32Builder, StringDictionaryBuilder,
TimestampMicrosecondBuilder,
};
use std::sync::Arc;

Expand All @@ -632,7 +641,6 @@ mod test {
writer.write_partial_schema(&batch.schema()).unwrap();
writer.write_batch(&batch).unwrap();
let buffer = writer.inner();
// assert_eq!(421203, buffer.len());

let mut reader = BatchReader::new(&buffer);
let batch2 = reader.read_batch().unwrap();
Expand All @@ -641,49 +649,65 @@ mod test {

fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int32, true),
Field::new("a", DataType::Int32, true),
Field::new(
"c1",
"b",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
),
Field::new("c2", DataType::Date32, true),
Field::new("c3", DataType::Decimal128(11, 2), true),
Field::new("c4", DataType::Boolean, true),
Field::new("c", DataType::Date32, true),
Field::new("d", DataType::Decimal128(11, 2), true),
Field::new("e", DataType::Boolean, true),
Field::new("f", DataType::Timestamp(TimeUnit::Microsecond, None), true),
Field::new(
"g",
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
true,
),
]));
let mut a = Int32Builder::new();
let mut b = StringDictionaryBuilder::new();
let mut c = Date32Builder::new();
let mut d = Decimal128Builder::new()
.with_precision_and_scale(11, 2)
.unwrap();
let mut c4_bool = BooleanBuilder::with_capacity(num_rows);
let mut e = BooleanBuilder::with_capacity(num_rows);
let mut f = TimestampMicrosecondBuilder::with_capacity(num_rows);
let mut g = TimestampMicrosecondBuilder::with_capacity(num_rows).with_timezone("UTC");
for i in 0..num_rows {
a.append_value(i as i32);
c.append_value(i as i32);
d.append_value((i * 1000000) as i128);
if allow_nulls && i % 10 == 0 {
b.append_null();
c4_bool.append_null();
e.append_null();
f.append_null();
g.append_null();
} else {
// test for dictionary-encoded strings
b.append_value("this string is repeated a lot");
c4_bool.append_value(i % 2 == 0)
e.append_value(i % 2 == 0);
f.append_value((i * 100000000) as i64);
g.append_value((i * 100000000) as i64);
}
}
let a = a.finish();
let b: DictionaryArray<Int32Type> = b.finish();
let c = c.finish();
let d = d.finish();
let c4_bool = c4_bool.finish();
let e = e.finish();
let f = f.finish();
let g = g.finish();
RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(a),
Arc::new(b),
Arc::new(c),
Arc::new(d),
Arc::new(c4_bool),
Arc::new(e),
Arc::new(f),
Arc::new(g),
],
)
.unwrap()
Expand Down

0 comments on commit 874a02f

Please sign in to comment.