Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Overflow when reading Timestamp from parquet file #542

Merged
merged 2 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 28 additions & 32 deletions core/src/parquet/read/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,17 @@ const INT96_SRC_BYTE_WIDTH: usize = 12;
// We convert INT96 to micros and store using i64
const INT96_DST_BYTE_WIDTH: usize = 8;

fn int96_to_microsecond(v: &[u8]) -> i64 {
let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64;
let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32;

unsafe {
((day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64)
.wrapping_mul(MICROS_PER_DAY)
.wrapping_add(nanos.read_unaligned() / 1000)
}
}

/// Decode timestamps represented as INT96 into i64 with micros precision
impl PlainDecoding for Int96TimestampMicrosType {
#[inline]
Expand All @@ -736,51 +747,36 @@ impl PlainDecoding for Int96TimestampMicrosType {
if !src.read_options.use_legacy_date_timestamp_or_ntz {
let mut offset = src.offset;
for _ in 0..num {
let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64;
let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32;

// TODO: optimize this further as checking value one by one is not very efficient
unsafe {
let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64
* MICROS_PER_DAY
+ nanos.read_unaligned() / 1000;
let micros = int96_to_microsecond(&src_data[offset..offset + INT96_SRC_BYTE_WIDTH]);

if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
panic!(
"Encountered timestamp value {}, which is before 1582-10-15 (counting \
if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) {
panic!(
"Encountered timestamp value {}, which is before 1582-10-15 (counting \
backwards from Unix eopch date 1970-01-01), and could be ambigous \
depending on whether a legacy Julian/Gregorian hybrid calendar is used, \
or a Proleptic Gregorian calendar is used.",
micros
);
}

offset += INT96_SRC_BYTE_WIDTH;
micros
);
}

offset += INT96_SRC_BYTE_WIDTH;
}
}

let mut offset = src.offset;
let mut dst_offset = INT96_DST_BYTE_WIDTH * dst.num_values;
unsafe {
for _ in 0..num {
let v = &src_data[offset..offset + INT96_SRC_BYTE_WIDTH];
let nanos = &v[..INT96_DST_BYTE_WIDTH] as *const [u8] as *const u8 as *const i64;
let day = &v[INT96_DST_BYTE_WIDTH..] as *const [u8] as *const u8 as *const i32;

let micros = (day.read_unaligned() - JULIAN_DAY_OF_EPOCH) as i64 * MICROS_PER_DAY
+ nanos.read_unaligned() / 1000;
for _ in 0..num {
let micros = int96_to_microsecond(&src_data[offset..offset + INT96_SRC_BYTE_WIDTH]);

bit::memcpy_value(
&micros,
INT96_DST_BYTE_WIDTH,
&mut dst.value_buffer[dst_offset..],
);
bit::memcpy_value(
&micros,
INT96_DST_BYTE_WIDTH,
&mut dst.value_buffer[dst_offset..],
);

offset += INT96_SRC_BYTE_WIDTH;
dst_offset += INT96_DST_BYTE_WIDTH;
}
offset += INT96_SRC_BYTE_WIDTH;
dst_offset += INT96_DST_BYTE_WIDTH;
}

src.offset = offset;
Expand Down
10 changes: 9 additions & 1 deletion spark/src/test/scala/org/apache/comet/CometCastSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("cast TimestampType to LongType") {
assume(CometSparkSessionExtensions.isSpark33Plus)
castTest(generateTimestamps(), DataTypes.LongType)
castTest(generateTimestampsExtended(), DataTypes.LongType)
}

ignore("cast TimestampType to FloatType") {
Expand Down Expand Up @@ -884,6 +884,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withNulls(values).toDF("b").withColumn("a", col("b").cast(DataTypes.DateType)).drop("b")
}

// Extended values are Timestamps that are outside dates supported chrono::DateTime and
// therefore not supported by operations using it.
private def generateTimestampsExtended(): DataFrame = {
val values = Seq("290000-12-31T01:00:00+02:00")
generateTimestamps().unionByName(
values.toDF("str").select(col("str").cast(DataTypes.TimestampType).as("a")))
}

private def generateTimestamps(): DataFrame = {
val values =
Seq(
Expand Down
Loading