Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Spark-compatible CAST from string to timestamp types #335

Merged
merged 30 commits into from
May 2, 2024
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7619757
casting str to timestamp
vaibhawvipul Apr 27, 2024
0a85cde
fix format
vaibhawvipul Apr 27, 2024
fe1896c
fixing failed tests, using char as pattern
vaibhawvipul Apr 28, 2024
70f1e69
bug fixes
vaibhawvipul Apr 28, 2024
0217f13
hangling microsecond
vaibhawvipul Apr 28, 2024
36d5cc5
make format
vaibhawvipul Apr 28, 2024
8d0a0d9
bug fixes and core refactor
vaibhawvipul Apr 29, 2024
87b5e66
format code
vaibhawvipul Apr 29, 2024
b9966b7
resolving merge conflicts
vaibhawvipul Apr 29, 2024
30c442d
removing print statements
vaibhawvipul Apr 29, 2024
800e085
clippy error
vaibhawvipul Apr 29, 2024
de0cfc2
enabling cast timestamp test case
vaibhawvipul Apr 29, 2024
fe18d81
code refactor
vaibhawvipul Apr 30, 2024
938b0b3
comet spark test case
vaibhawvipul Apr 30, 2024
cc60cfa
adding all the supported format in test
vaibhawvipul Apr 30, 2024
d250817
merge conflict resolved
vaibhawvipul May 1, 2024
889f91b
resolve conflicts
vaibhawvipul May 1, 2024
9dad369
fallback spark when timestamp not utc
vaibhawvipul May 1, 2024
825fe5e
bug fix
vaibhawvipul May 1, 2024
5041192
bug fix
vaibhawvipul May 1, 2024
9a8dc1d
adding an explainer commit
vaibhawvipul May 1, 2024
2980176
fix test case
vaibhawvipul May 1, 2024
2ffea83
bug fix
vaibhawvipul May 2, 2024
6db8115
bug fix
vaibhawvipul May 2, 2024
a67ccf7
better error handling for unwrap in fn parse_str_to_time_only_timestamp
vaibhawvipul May 2, 2024
b7a3961
remove unwrap from macro
vaibhawvipul May 2, 2024
2f3ab08
improving error handling
vaibhawvipul May 2, 2024
7a39136
adding tests for invalid inputs
vaibhawvipul May 2, 2024
4743742
removed all unwraps from timestamp cast functions
vaibhawvipul May 2, 2024
8c4ad72
code format
vaibhawvipul May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
hangling microsecond
vaibhawvipul committed Apr 28, 2024
commit 0217f13d136d76fe99759bfce9de0114d9e7f338
79 changes: 54 additions & 25 deletions core/src/execution/datafusion/expressions/cast.rs
Original file line number Diff line number Diff line change
@@ -25,14 +25,15 @@ use std::{
use crate::errors::{CometError, CometResult};
use arrow::{
compute::{cast_with_options, CastOptions},
datatypes::TimestampMillisecondType,
datatypes::TimestampMicrosecondType,
record_batch::RecordBatch,
util::display::FormatOptions,
};
use arrow_array::{
Array, ArrayRef, BooleanArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray,
};
use arrow_schema::{DataType, Schema};
use chrono::{TimeZone, Timelike};
use datafusion::logical_expr::ColumnarValue;
use datafusion_common::{internal_err, Result as DataFusionResult, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
@@ -73,7 +74,7 @@ pub struct Cast {
macro_rules! cast_utf8_to_timestamp {
($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident) => {{
let len = $array.len();
let mut cast_array = PrimitiveArray::<$array_type>::builder(len);
let mut cast_array = PrimitiveArray::<$array_type>::builder(len).with_timezone("UTC");
for i in 0..len {
if $array.is_null(i) {
cast_array.append_null()
@@ -127,8 +128,7 @@ impl Cast {
(DataType::LargeUtf8, DataType::Boolean) => {
Self::spark_cast_utf8_to_boolean::<i64>(&array, self.eval_mode)?
}
(DataType::UInt8, DataType::Timestamp(_, _)) => {
println!("Casting UInt8 to Timestamp");
(DataType::Utf8, DataType::Timestamp(_, _)) => {
Self::cast_string_to_timestamp(&array, to_type, self.eval_mode)?
}
_ => cast_with_options(&array, to_type, &CAST_OPTIONS)?,
@@ -152,8 +152,8 @@ impl Cast {
cast_utf8_to_timestamp!(
string_array,
eval_mode,
TimestampMillisecondType,
parse_timestamp
TimestampMicrosecondType,
timstamp_parser
)
}
_ => unreachable!("Invalid data type {:?} in cast from string", to_type),
@@ -275,7 +275,7 @@ impl PhysicalExpr for Cast {
}
}

fn parse_timestamp(value: &str, eval_mode: EvalMode) -> CometResult<Option<i64>> {
fn timstamp_parser(value: &str, eval_mode: EvalMode) -> CometResult<Option<i64>> {
let value = value.trim();
if value.is_empty() {
return Ok(None);
@@ -339,9 +339,8 @@ fn parse_timestamp(value: &str, eval_mode: EvalMode) -> CometResult<Option<i64>>
}

fn parse_ymd_timestamp(year: i32, month: u32, day: u32) -> CometResult<Option<i64>> {
let datetime = chrono::NaiveDate::from_ymd_opt(year, month, day);
let timestamp = datetime.unwrap().and_hms_micro_opt(0, 0, 0, 0);
Ok(Some(timestamp.unwrap().and_utc().timestamp_micros()))
let datetime = chrono::Utc.with_ymd_and_hms(year, month, day, 0, 0, 0).unwrap();
Ok(Some(datetime.timestamp_micros()))
}

fn parse_hms_timestamp(
@@ -353,11 +352,8 @@ fn parse_hms_timestamp(
second: u32,
microsecond: u32,
) -> CometResult<Option<i64>> {
let datetime = chrono::NaiveDate::from_ymd_opt(year, month, day);
let timestamp = datetime
.unwrap()
.and_hms_micro_opt(hour, minute, second, microsecond);
Ok(Some(timestamp.unwrap().and_utc().timestamp_micros()))
let datetime = chrono::Utc.with_ymd_and_hms(year, month, day, hour, minute, second).unwrap().with_nanosecond(microsecond * 1000);
Ok(Some(datetime.unwrap().timestamp_micros()))
}

fn get_timestamp_values(value: &str, timestamp_type: &str) -> CometResult<Option<i64>> {
@@ -429,49 +425,82 @@ fn parse_str_to_time_only_timestamp(value: &str) -> CometResult<Option<i64>> {
let microsecond = time_values
.get(3)
.map_or(0, |ms| ms.parse::<u32>().unwrap_or(0));
let datetime = chrono::Local::now().to_utc().date_naive();
let datetime = chrono::Utc::now().date_naive();
let timestamp = datetime.and_hms_micro_opt(hour, minute, second, microsecond);

Ok(Some(timestamp.unwrap().and_utc().timestamp_micros()))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::TimestampMicrosecondType;
use arrow_array::StringArray;
use arrow_schema::TimeUnit;

#[test]
fn parse_timestamp_test() {
fn timstamp_parser_test() {
// write for all formats
assert_eq!(
parse_timestamp("2020", EvalMode::Legacy).unwrap(),
timstamp_parser("2020", EvalMode::Legacy).unwrap(),
Some(1577836800000000)
);
assert_eq!(
parse_timestamp("2020-01", EvalMode::Legacy).unwrap(),
timstamp_parser("2020-01", EvalMode::Legacy).unwrap(),
Some(1577836800000000)
);
assert_eq!(
parse_timestamp("2020-01-01", EvalMode::Legacy).unwrap(),
timstamp_parser("2020-01-01", EvalMode::Legacy).unwrap(),
Some(1577836800000000)
);
assert_eq!(
parse_timestamp("2020-01-01T12", EvalMode::Legacy).unwrap(),
timstamp_parser("2020-01-01T12", EvalMode::Legacy).unwrap(),
Some(1577880000000000)
);
assert_eq!(
parse_timestamp("2020-01-01T12:34", EvalMode::Legacy).unwrap(),
timstamp_parser("2020-01-01T12:34", EvalMode::Legacy).unwrap(),
Some(1577882040000000)
);
assert_eq!(
parse_timestamp("2020-01-01T12:34:56", EvalMode::Legacy).unwrap(),
timstamp_parser("2020-01-01T12:34:56", EvalMode::Legacy).unwrap(),
Some(1577882096000000)
);
assert_eq!(
parse_timestamp("2020-01-01T12:34:56.123456", EvalMode::Legacy).unwrap(),
timstamp_parser("2020-01-01T12:34:56.123456", EvalMode::Legacy).unwrap(),
Some(1577882096123456)
);
assert_eq!(
parse_timestamp("T2", EvalMode::Legacy).unwrap(),
timstamp_parser("T2", EvalMode::Legacy).unwrap(),
Some(1714269600000000)
);
}

#[test]
fn test_cast_string_to_timestamp() {
let array: ArrayRef = Arc::new(StringArray::from(vec![
Some("2020-01-01T12:34:56.123456"),
Some("T2"),
]));

let string_array = array
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.expect("Expected a string array");

let eval_mode = EvalMode::Legacy;
let result = cast_utf8_to_timestamp!(
&string_array,
eval_mode,
TimestampMicrosecondType,
timstamp_parser
);

println!("{:?}", result);

assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
);
assert_eq!(result.len(), 2);
}
}