From 445b6bffbe0c27bca91019a42ab508b05617700c Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Sat, 8 Jun 2024 14:49:18 +0200 Subject: [PATCH 1/2] fix: Overflow when reading Timestamp from parquet file --- core/src/parquet/read/values.rs | 11 ++++++----- .../test/scala/org/apache/comet/CometCastSuite.scala | 10 +++++++++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/parquet/read/values.rs b/core/src/parquet/read/values.rs index c2b1b6e64..2b9e12571 100644 --- a/core/src/parquet/read/values.rs +++ b/core/src/parquet/read/values.rs @@ -742,9 +742,9 @@ impl PlainDecoding for Int96TimestampMicrosType { // TODO: optimize this further as checking value one by one is not very efficient unsafe { - let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64 - * MICROS_PER_DAY - + nanos.read_unaligned() / 1000; + let micros = ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64) + .wrapping_mul(MICROS_PER_DAY) + .wrapping_add(nanos.read_unaligned() / 1000); if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) { panic!( @@ -769,8 +769,9 @@ impl PlainDecoding for Int96TimestampMicrosType { let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64; let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32; - let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64 * MICROS_PER_DAY - + nanos.read_unaligned() / 1000; + let micros = ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64) + .wrapping_mul(MICROS_PER_DAY) + .wrapping_add(nanos.read_unaligned() / 1000); bit::memcpy_value( µs, diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 71134e550..31d718d44 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -780,7 +780,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast TimestampType to LongType") { assume(CometSparkSessionExtensions.isSpark33Plus) - castTest(generateTimestamps(), DataTypes.LongType) + castTest(generateTimestampsExtended(), DataTypes.LongType) } ignore("cast TimestampType to FloatType") { @@ -884,6 +884,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { withNulls(values).toDF("b").withColumn("a", col("b").cast(DataTypes.DateType)).drop("b") } + // Extended values are Timestamps that are outside dates supported chrono::DateTime and + // therefore not supported by operations using it. + private def generateTimestampsExtended(): DataFrame = { + val values = Seq("290000-12-31T01:00:00+02:00") + generateTimestamps().unionByName( + values.toDF("str").select(col("str").cast(DataTypes.TimestampType).as("a"))) + } + private def generateTimestamps(): DataFrame = { val values = Seq( From a4be042fbde46dbb611906d780c71b697e785e37 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Tue, 11 Jun 2024 17:08:34 +0200 Subject: [PATCH 2/2] Add helper method --- core/src/parquet/read/values.rs | 61 +++++++++++++++------------------ 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/core/src/parquet/read/values.rs b/core/src/parquet/read/values.rs index 2b9e12571..76c8a4a1d 100644 --- a/core/src/parquet/read/values.rs +++ b/core/src/parquet/read/values.rs @@ -727,6 +727,17 @@ const INT96_SRC_BYTE_WIDTH: usize = 12; // We convert INT96 to micros and store using i64 const INT96_DST_BYTE_WIDTH: usize = 8; +fn int96_to_microsecond(v: &[u8]) -> i64 { + let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64; + let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32; + + unsafe { + ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64) + .wrapping_mul(MICROS_PER_DAY) + .wrapping_add(nanos.read_unaligned() / 1000) + } +} + /// Decode timestamps represented as INT96 into i64 with micros precision impl PlainDecoding for Int96TimestampMicrosType { #[inline] @@ -736,52 +747,36 @@ impl PlainDecoding for Int96TimestampMicrosType { if !src.read_options.use_legacy_date_timestamp_or_ntz { let mut offset = src.offset; for _ in 0..num { - let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH]; - let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64; - let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32; - // TODO: optimize this further as checking value one by one is not very efficient - unsafe { - let micros = ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64) - .wrapping_mul(MICROS_PER_DAY) - .wrapping_add(nanos.read_unaligned() / 1000); + let micros = int96_to_microsecond(&src_data[offset..offset + INT96_SRC_BYTE_WIDTH]); - if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) { - panic!( - "Encountered timestamp value {}, which is before 1582-10-15 (counting \ + if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) { + panic!( + "Encountered timestamp value {}, which is before 1582-10-15 (counting \ backwards from Unix eopch date 1970-01-01), and could be ambigous \ depending on whether a legacy Julian/Gregorian hybrid calendar is used, \ or a Proleptic Gregorian calendar is used.", - micros - ); - } - - offset += INT96_SRC_BYTE_WIDTH; + micros + ); } + + offset += INT96_SRC_BYTE_WIDTH; } } let mut offset = src.offset; let mut dst_offset = INT96_DST_BYTE_WIDTH * dst.num_values; - unsafe { - for _ in 0..num { - let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH]; - let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64; - let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32; - - let micros = ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64) - .wrapping_mul(MICROS_PER_DAY) - .wrapping_add(nanos.read_unaligned() / 1000); + for _ in 0..num { + let micros = int96_to_microsecond(&src_data[offset..offset + INT96_SRC_BYTE_WIDTH]); - bit::memcpy_value( - µs, - INT96_DST_BYTE_WIDTH, - &mut dst.value_buffer[dst_offset..], - ); + bit::memcpy_value( + µs, + INT96_DST_BYTE_WIDTH, + &mut dst.value_buffer[dst_offset..], + ); - offset += INT96_SRC_BYTE_WIDTH; - dst_offset += INT96_DST_BYTE_WIDTH; - } + offset += INT96_SRC_BYTE_WIDTH; + dst_offset += INT96_DST_BYTE_WIDTH; } src.offset = offset;