Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: Overflow when reading Timestamp from parquet file (apache#542)
Browse files Browse the repository at this point in the history
* fix: Overflow when reading Timestamp from parquet file

* Add helper method
eejbyfeldt authored and kazuyukitanimura committed Jul 1, 2024
1 parent ea74aa5 commit 0103775
Showing 2 changed files with 37 additions and 33 deletions.
60 changes: 28 additions & 32 deletions core/src/parquet/read/values.rs
Original file line number Diff line number Diff line change
@@ -727,6 +727,17 @@ const INT96_SRC_BYTE_WIDTH: usize = 12;
// We convert INT96 to micros and store using i64
const INT96_DST_BYTE_WIDTH: usize = 8;

fn int96_to_microsecond(v: &[u8]) -> i64 {
let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64;
let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32;

unsafe {
((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64)
.wrapping_mul(MICROS_PER_DAY)
.wrapping_add(nanos.read_unaligned() / 1000)
}
}

/// Decode timestamps represented as INT96 into i64 with micros precision
impl PlainDecoding for Int96TimestampMicrosType {
#[inline]
@@ -736,51 +747,36 @@ impl PlainDecoding for Int96TimestampMicrosType {
if !src.read_options.use_legacy_date_timestamp_or_ntz {
let mut offset = src.offset;
for _ in 0..num {
let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64;
let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32;

// TODO: optimize this further as checking value one by one is not very efficient
unsafe {
let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64
* MICROS_PER_DAY
+ nanos.read_unaligned() / 1000;
let micros = int96_to_microsecond(&src_data[offset..offset + INT96_SRC_BYTE_WIDTH]);

if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
panic!(
"Encountered timestamp value {}, which is before 1582-10-15 (counting \
if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
panic!(
"Encountered timestamp value {}, which is before 1582-10-15 (counting \
backwards from Unix eopch date 1970-01-01), and could be ambigous \
depending on whether a legacy Julian/Gregorian hybrid calendar is used, \
or a Proleptic Gregorian calendar is used.",
micros
);
}

offset += INT96_SRC_BYTE_WIDTH;
micros
);
}

offset += INT96_SRC_BYTE_WIDTH;
}
}

let mut offset = src.offset;
let mut dst_offset = INT96_DST_BYTE_WIDTH * dst.num_values;
unsafe {
for _ in 0..num {
let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64;
let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32;

let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64 * MICROS_PER_DAY
+ nanos.read_unaligned() / 1000;
for _ in 0..num {
let micros = int96_to_microsecond(&src_data[offset..offset + INT96_SRC_BYTE_WIDTH]);

bit::memcpy_value(
&micros,
INT96_DST_BYTE_WIDTH,
&mut dst.value_buffer[dst_offset..],
);
bit::memcpy_value(
&micros,
INT96_DST_BYTE_WIDTH,
&mut dst.value_buffer[dst_offset..],
);

offset += INT96_SRC_BYTE_WIDTH;
dst_offset += INT96_DST_BYTE_WIDTH;
}
offset += INT96_SRC_BYTE_WIDTH;
dst_offset += INT96_DST_BYTE_WIDTH;
}

src.offset = offset;
10 changes: 9 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
@@ -780,7 +780,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("cast TimestampType to LongType") {
assume(CometSparkSessionExtensions.isSpark33Plus)
castTest(generateTimestamps(), DataTypes.LongType)
castTest(generateTimestampsExtended(), DataTypes.LongType)
}

ignore("cast TimestampType to FloatType") {
@@ -884,6 +884,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withNulls(values).toDF("b").withColumn("a", col("b").cast(DataTypes.DateType)).drop("b")
}

// Extended values are Timestamps that are outside dates supported chrono::DateTime and
// therefore not supported by operations using it.
private def generateTimestampsExtended(): DataFrame = {
val values = Seq("290000-12-31T01:00:00+02:00")
generateTimestamps().unionByName(
values.toDF("str").select(col("str").cast(DataTypes.TimestampType).as("a")))
}

private def generateTimestamps(): DataFrame = {
val values =
Seq(

0 comments on commit 0103775

Please sign in to comment.