From bf6b4d45b1175801f8b6e2a25c28147a360ea2f3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 4 Dec 2024 14:01:56 -0700 Subject: [PATCH] support more timestamp conversions --- native/spark-expr/src/utils.rs | 41 +++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/native/spark-expr/src/utils.rs b/native/spark-expr/src/utils.rs index db4ad1956..2fc8de974 100644 --- a/native/spark-expr/src/utils.rs +++ b/native/spark-expr/src/utils.rs @@ -19,7 +19,7 @@ use arrow_array::{ cast::as_primitive_array, types::{Int32Type, TimestampMicrosecondType}, }; -use arrow_schema::{ArrowError, DataType}; +use arrow_schema::{ArrowError, DataType, TimeUnit}; use std::sync::Arc; use crate::timezone::Tz; @@ -27,6 +27,7 @@ use arrow::{ array::{as_dictionary_array, Array, ArrayRef, PrimitiveArray}, temporal_conversions::as_datetime, }; +use arrow_array::types::TimestampMillisecondType; use chrono::{DateTime, Offset, TimeZone}; /// Preprocesses input arrays to add timezone information from Spark to Arrow array datatype or @@ -70,6 +71,9 @@ pub fn array_with_timezone( Some(DataType::Timestamp(_, Some(_))) => { timestamp_ntz_to_timestamp(array, timezone.as_str(), Some(timezone.as_str())) } + Some(DataType::Timestamp(_, None)) => { + timestamp_ntz_to_timestamp(array, timezone.as_str(), None) + } _ => { // Not supported panic!( @@ -80,7 +84,7 @@ pub fn array_with_timezone( } } } - DataType::Timestamp(_, Some(_)) => { + DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => { assert!(!timezone.is_empty()); let array = as_primitive_array::(&array); let array_with_timezone = array.clone().with_timezone(timezone.clone()); @@ -92,6 +96,18 @@ pub fn array_with_timezone( _ => Ok(array), } } + DataType::Timestamp(TimeUnit::Millisecond, Some(_)) => { + assert!(!timezone.is_empty()); + let array = as_primitive_array::(&array); + let array_with_timezone = array.clone().with_timezone(timezone.clone()); + let array = Arc::new(array_with_timezone) as ArrayRef; + match to_type { + Some(DataType::Utf8) | Some(DataType::Date32) => { + pre_timestamp_cast(array, timezone) + } + _ => Ok(array), + } + } DataType::Dictionary(_, value_type) if matches!(value_type.as_ref(), &DataType::Timestamp(_, _)) => { @@ -127,7 +143,7 @@ fn timestamp_ntz_to_timestamp( ) -> Result { assert!(!tz.is_empty()); match array.data_type() { - DataType::Timestamp(_, None) => { + DataType::Timestamp(TimeUnit::Microsecond, None) => { let array = as_primitive_array::(&array); let tz: Tz = tz.parse()?; let array: PrimitiveArray = array.try_unary(|value| { @@ -146,6 +162,25 @@ fn timestamp_ntz_to_timestamp( }; Ok(Arc::new(array_with_tz)) } + DataType::Timestamp(TimeUnit::Millisecond, None) => { + let array = as_primitive_array::(&array); + let tz: Tz = tz.parse()?; + let array: PrimitiveArray = array.try_unary(|value| { + as_datetime::(value) + .ok_or_else(|| datetime_cast_err(value)) + .map(|local_datetime| { + let datetime: DateTime = + tz.from_local_datetime(&local_datetime).unwrap(); + datetime.timestamp_millis() + }) + })?; + let array_with_tz = if let Some(to_tz) = to_timezone { + array.with_timezone(to_tz) + } else { + array + }; + Ok(Arc::new(array_with_tz)) + } _ => Ok(array), } }