Skip to content

Commit

Permalink
fix: Overflow when reading Timestamp from parquet file
Browse files Browse the repository at this point in the history
  • Loading branch information
eejbyfeldt committed Jun 11, 2024
1 parent e07f24c commit 445b6bf
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
11 changes: 6 additions & 5 deletions core/src/parquet/read/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,9 +742,9 @@ impl PlainDecoding for Int96TimestampMicrosType {

// TODO: optimize this further as checking value one by one is not very efficient
unsafe {
let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64
* MICROS_PER_DAY
+ nanos.read_unaligned() / 1000;
let micros = ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64)
.wrapping_mul(MICROS_PER_DAY)
.wrapping_add(nanos.read_unaligned() / 1000);

if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
panic!(
Expand All @@ -769,8 +769,9 @@ impl PlainDecoding for Int96TimestampMicrosType {
let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64;
let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32;

let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64 * MICROS_PER_DAY
+ nanos.read_unaligned() / 1000;
let micros = ((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64)
.wrapping_mul(MICROS_PER_DAY)
.wrapping_add(nanos.read_unaligned() / 1000);

bit::memcpy_value(
&micros,
Expand Down
10 changes: 9 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("cast TimestampType to LongType") {
assume(CometSparkSessionExtensions.isSpark33Plus)
castTest(generateTimestamps(), DataTypes.LongType)
castTest(generateTimestampsExtended(), DataTypes.LongType)
}

ignore("cast TimestampType to FloatType") {
Expand Down Expand Up @@ -884,6 +884,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withNulls(values).toDF("b").withColumn("a", col("b").cast(DataTypes.DateType)).drop("b")
}

// Extended values are Timestamps that are outside dates supported chrono::DateTime and
// therefore not supported by operations using it.
private def generateTimestampsExtended(): DataFrame = {
val values = Seq("290000-12-31T01:00:00+02:00")
generateTimestamps().unionByName(
values.toDF("str").select(col("str").cast(DataTypes.TimestampType).as("a")))
}

private def generateTimestamps(): DataFrame = {
val values =
Seq(
Expand Down

0 comments on commit 445b6bf

Please sign in to comment.