Skip to content

Commit

Permalink
fix(avro): correct parse timestamp data before Epoch (#14832)
Browse files Browse the repository at this point in the history
Co-authored-by: xiangjinwu <[email protected]>
  • Loading branch information
xuefengze and xiangjinwu authored Jan 29, 2024
1 parent 7e937c2 commit 5f7e557
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 22 deletions.
6 changes: 6 additions & 0 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,12 @@ impl Timestamp {
self.0.timestamp_nanos_opt().unwrap()
}

pub fn with_millis(timestamp_millis: i64) -> Result<Self> {
let secs = timestamp_millis.div_euclid(1_000);
let nsecs = timestamp_millis.rem_euclid(1_000) * 1_000_000;
Self::with_secs_nsecs(secs, nsecs as u32)
}

pub fn with_micros(timestamp_micros: i64) -> Result<Self> {
let secs = timestamp_micros.div_euclid(1_000_000);
let nsecs = timestamp_micros.rem_euclid(1_000_000) * 1000;
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ fn avro_type_mapping(schema: &Schema) -> anyhow::Result<DataType> {
DataType::Decimal
}
Schema::Date => DataType::Date,
Schema::LocalTimestampMillis => DataType::Timestamp,
Schema::LocalTimestampMicros => DataType::Timestamp,
Schema::TimestampMillis => DataType::Timestamptz,
Schema::TimestampMicros => DataType::Timestamptz,
Schema::Duration => DataType::Interval,
Expand Down
53 changes: 31 additions & 22 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ use chrono::Datelike;
use itertools::Itertools;
use num_bigint::{BigInt, Sign};
use risingwave_common::array::{ListValue, StructValue};
use risingwave_common::cast::{i64_to_timestamp, i64_to_timestamptz};
use risingwave_common::error::Result as RwResult;
use risingwave_common::log::LogSuppresser;
use risingwave_common::types::{DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time};
use risingwave_common::types::{
DataType, Date, Datum, Interval, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
};
use risingwave_common::util::iter_util::ZipEqFast;

use super::{Access, AccessError, AccessResult};
Expand Down Expand Up @@ -181,19 +182,27 @@ impl<'a> AvroParseOptions<'a> {
}
(Some(DataType::Varchar) | None, Value::String(s)) => s.clone().into_boxed_str().into(),
// ---- Timestamp -----
(Some(DataType::Timestamp) | None, Value::TimestampMillis(ms)) => {
i64_to_timestamp(*ms).map_err(|_| create_error())?.into()
(Some(DataType::Timestamp) | None, Value::LocalTimestampMillis(ms)) => {
Timestamp::with_millis(*ms)
.map_err(|_| create_error())?
.into()
}
(Some(DataType::Timestamp) | None, Value::TimestampMicros(us)) => {
i64_to_timestamp(*us).map_err(|_| create_error())?.into()
(Some(DataType::Timestamp) | None, Value::LocalTimestampMicros(us)) => {
Timestamp::with_micros(*us)
.map_err(|_| create_error())?
.into()
}

// ---- TimestampTz -----
(Some(DataType::Timestamptz), Value::TimestampMillis(ms)) => {
i64_to_timestamptz(*ms).map_err(|_| create_error())?.into()
(Some(DataType::Timestamptz) | None, Value::TimestampMillis(ms)) => {
Timestamptz::from_millis(*ms)
.ok_or(AccessError::Other(anyhow!(
"timestamptz with milliseconds {ms} * 1000 is out of range",
)))?
.into()
}
(Some(DataType::Timestamptz), Value::TimestampMicros(us)) => {
i64_to_timestamptz(*us).map_err(|_| create_error())?.into()
(Some(DataType::Timestamptz) | None, Value::TimestampMicros(us)) => {
Timestamptz::from_micros(*us).into()
}

// ---- Interval -----
Expand Down Expand Up @@ -424,7 +433,7 @@ pub(crate) fn unix_epoch_days() -> i32 {
mod tests {
use apache_avro::Decimal as AvroDecimal;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::types::{Decimal, Timestamp};
use risingwave_common::types::{Decimal, Timestamptz};

use super::*;

Expand Down Expand Up @@ -486,24 +495,24 @@ mod tests {
}

#[test]
fn test_avro_timestamp_micros() {
let v1 = Value::TimestampMicros(1620000000000);
let v2 = Value::TimestampMillis(1620000000);
fn test_avro_timestamptz_micros() {
let v1 = Value::TimestampMicros(1620000000000000);
let v2 = Value::TimestampMillis(1620000000000);
let value_schema1 = Schema::TimestampMicros;
let value_schema2 = Schema::TimestampMillis;
let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamp).unwrap();
let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamp).unwrap();
let datum1 = from_avro_value(v1, &value_schema1, &DataType::Timestamptz).unwrap();
let datum2 = from_avro_value(v2, &value_schema2, &DataType::Timestamptz).unwrap();
assert_eq!(
datum1,
Some(ScalarImpl::Timestamp(Timestamp::new(
"2021-05-03T00:00:00".parse().unwrap()
)))
Some(ScalarImpl::Timestamptz(
Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
))
);
assert_eq!(
datum2,
Some(ScalarImpl::Timestamp(Timestamp::new(
"2021-05-03T00:00:00".parse().unwrap()
)))
Some(ScalarImpl::Timestamptz(
Timestamptz::from_str("2021-05-03T00:00:00Z").unwrap()
))
);
}

Expand Down

0 comments on commit 5f7e557

Please sign in to comment.